You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/01/04 11:39:05 UTC

svn commit: r492511 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store: ./ kahadaptor/ quick/

Author: chirino
Date: Thu Jan  4 02:39:04 2007
New Revision: 492511

URL: http://svn.apache.org/viewvc?view=rev&rev=492511
Log:
Quick store bug fixes.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java?view=diff&rev=492511&r1=492510&r2=492511
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ReferenceStore.java Thu Jan  4 02:39:04 2007
@@ -54,6 +54,11 @@
 		public void setOffset(int offset) {
 			this.offset = offset;
 		}
+		
+		@Override
+		public String toString() {
+			return "ReferenceData fileId="+fileId+", offset="+offset+", expiration="+expiration;
+		}
 	}
 	
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?view=diff&rev=492511&r1=492510&r2=492511
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Thu Jan  4 02:39:04 2007
@@ -95,7 +95,7 @@
         }else{    
             for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
             	ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
-                if(msg.messageId.equals(identity)){
+                if(msg.messageId.equals(identity.toString())){
                     result=msg;
                     cache.put(identity,entry);
                     break;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java?view=diff&rev=492511&r1=492510&r2=492511
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java Thu Jan  4 02:39:04 2007
@@ -30,6 +30,7 @@
 
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.DataStructure;
 import org.apache.activemq.command.JournalQueueAck;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -167,6 +168,8 @@
             	data.setFileId(location.getDataFileId());
             	data.setOffset(location.getOffset());
                 referenceStore.addMessageReference(context, id, data);
+            	System.out.println("referenceStore.put "+id+"-->"+data);
+                
             }
         }
         catch (Throwable e) {
@@ -332,6 +335,7 @@
                 	Entry<MessageId, ReferenceData> entry = iterator.next();
                     try {
                     	referenceStore.addMessageReference(context, entry.getKey(), entry.getValue() );
+                    	System.out.println("referenceStore.put "+entry.getKey()+"-->"+entry.getValue());
                     } catch (Throwable e) {
                         log.warn("Message could not be added to long term store: " + e.getMessage(), e);
                     }
@@ -394,21 +398,23 @@
         
         if( data==null ) {
         	data = referenceStore.getMessageReference(identity);
+        	System.out.println("referenceStore.get "+identity+"-->"+data);
+            if( data==null ) {
+            	return null;
+            }
         }
-        
-        if( data==null ) {
-        	return null;
-        }
-        
-        Message answer = null;
-        if (answer != null ) {
-            return answer;
-        }
-        
+                
     	Location location = new Location();
     	location.setDataFileId(data.getFileId());
     	location.setOffset(data.getOffset());
-    	return (Message) peristenceAdapter.readCommand(location);
+    	
+    	DataStructure rc = peristenceAdapter.readCommand(location);
+    	
+    	try {
+			return (Message) rc;
+		} catch (ClassCastException e) {
+			throw new IOException("Could not read message "+identity+" at location "+location+", expected a message, but got: "+rc);
+		}
     }
 
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java?view=diff&rev=492511&r1=492510&r2=492511
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java Thu Jan  4 02:39:04 2007
@@ -88,7 +88,7 @@
 
     private UsageManager usageManager;
 
-    private long cleanupInterval = 1000 * 10;
+    private long cleanupInterval = 1000 * 1/10;
     private long checkpointInterval = 1000 * 10;
     
     private int maxCheckpointWorkers = 1;
@@ -147,15 +147,7 @@
         referenceStoreAdapter.start();
     	
     	Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
-    	for (Integer fileId : files) {
-			try {
-				asyncDataManager.addInterestInFile(fileId);
-			} catch (IOException e) {
-				// We can expect these since referenceStoreAdapter is a litle behind in updates
-				// and it might think it has references to data files that have allready come and gone.. 
-				// This should get resolved once recovery kicks in.
-			}
-		}
+    	log.info("Active data files: "+files);
         
         checkpointTask = taskRunnerFactory.createTaskRunner(new Task(){
             public boolean iterate() {
@@ -172,8 +164,7 @@
 	        public void run() {
                 checkpoint(false);
 	        }
-	    };
-	    
+	    };	    
         Scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval);
         
         periodicCleanupTask = new Runnable() {
@@ -193,6 +184,7 @@
         
         this.usageManager.removeUsageListener(this);        
         Scheduler.cancel(periodicCheckpointTask);
+        Scheduler.cancel(periodicCleanupTask);
 
         
         Iterator<QuickMessageStore> iterator = queues.values().iterator();
@@ -499,7 +491,7 @@
                             for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
                                 TxOperation op = (TxOperation) iter.next();
                                 if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
-                                    op.store.replayAddMessage(context, (Message)op.data, pos);
+                                    op.store.replayAddMessage(context, (Message)op.data, op.location);
                                 }
                                 if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
                                     op.store.replayRemoveMessage(context, (MessageAck) op.data);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java?view=diff&rev=492511&r1=492510&r2=492511
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTransactionStore.java Thu Jan  4 02:39:04 2007
@@ -56,11 +56,13 @@
         public byte operationType;
         public QuickMessageStore store;
         public Object data;
+        public Location location;
         
-        public TxOperation(byte operationType, QuickMessageStore store, Object data) {
+        public TxOperation(byte operationType, QuickMessageStore store, Object data, Location location) {
             this.operationType=operationType;
             this.store=store;
             this.data=data;
+            this.location=location;
         }
         
     }
@@ -77,16 +79,16 @@
             this.location=location;
         }
 
-        public void add(QuickMessageStore store, Message msg) {
-            operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg));
+        public void add(QuickMessageStore store, Message msg, Location location) {
+            operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg, location));
         }
 
         public void add(QuickMessageStore store, MessageAck ack) {
-            operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack));
+            operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack, null));
         }
 
         public void add(QuickTopicMessageStore store, JournalTopicAck ack) {
-            operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack));
+            operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack, null));
         }
         
         public Message[] getMessages() {
@@ -283,7 +285,7 @@
      */
     void addMessage(QuickMessageStore store, Message message, Location location) throws IOException {
         Tx tx = getTx(message.getTransactionId(), location);
-        tx.add(store, message);
+        tx.add(store, message, location);
     }
 
     /**