You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/01/04 12:00:32 UTC

svn commit: r492517 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick: QuickMessageStore.java QuickPersistenceAdapter.java QuickTopicMessageStore.java

Author: chirino
Date: Thu Jan  4 03:00:31 2007
New Revision: 492517

URL: http://svn.apache.org/viewvc?view=rev&rev=492517
Log:
Quick Store bug fixes and added a bench mark test

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java?view=diff&rev=492517&r1=492516&r2=492517
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java Thu Jan  4 03:00:31 2007
@@ -157,7 +157,7 @@
 		}
     }
     
-    public void replayAddMessage(ConnectionContext context, Message message, Location location) {
+    public boolean replayAddMessage(ConnectionContext context, Message message, Location location) {
     	MessageId id = message.getMessageId();
         try {
             // Only add the message if it has not already been added.
@@ -168,13 +168,13 @@
             	data.setFileId(location.getDataFileId());
             	data.setOffset(location.getOffset());
                 referenceStore.addMessageReference(context, id, data);
-            	System.out.println("referenceStore.put "+id+"-->"+data);
-                
+                return true;
             }
         }
         catch (Throwable e) {
             log.warn("Could not replay add for message '" + id + "'.  Message may have already been added. reason: " + e,e);
         }
+        return false;
     }    
     
     /**
@@ -238,17 +238,19 @@
         }
     }
     
-    public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
+    public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
         try {
             // Only remove the message if it has not already been removed.
             ReferenceData t = referenceStore.getMessageReference(messageAck.getLastMessageId());
             if( t!=null ) {
                 referenceStore.removeMessage(context, messageAck);
+                return true;
             }
         }
         catch (Throwable e) {
             log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'.  Message may have already been acknowledged. reason: " + e);
         }
+        return false;
     }
     
     /**
@@ -335,7 +337,6 @@
                 	Entry<MessageId, ReferenceData> entry = iterator.next();
                     try {
                     	referenceStore.addMessageReference(context, entry.getKey(), entry.getValue() );
-                    	System.out.println("referenceStore.put "+entry.getKey()+"-->"+entry.getValue());
                     } catch (Throwable e) {
                         log.warn("Message could not be added to long term store: " + e.getMessage(), e);
                     }
@@ -398,7 +399,6 @@
         
         if( data==null ) {
         	data = referenceStore.getMessageReference(identity);
-        	System.out.println("referenceStore.get "+identity+"-->"+data);
             if( data==null ) {
             	return null;
             }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java?view=diff&rev=492517&r1=492516&r2=492517
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java Thu Jan  4 03:00:31 2007
@@ -422,9 +422,10 @@
     private void recover() throws IllegalStateException, IOException {
 
         Location pos = null;
-        int transactionCounter = 0;
+        int redoCounter = 0;
 
         log.info("Journal Recovery Started from: " + asyncDataManager);
+        long start = System.currentTimeMillis();
         ConnectionContext context = new ConnectionContext();
 
         // While we have records in the journal.
@@ -439,8 +440,9 @@
                     transactionStore.addMessage(store, message, pos);
                 }
                 else {
-                    store.replayAddMessage(context, message, pos);
-                    transactionCounter++;
+                    if( store.replayAddMessage(context, message, pos) ) {
+                    	redoCounter++;
+                    }
                 }
             } else {
                 switch (c.getDataStructureType()) {
@@ -452,8 +454,9 @@
                         transactionStore.removeMessage(store, command.getMessageAck(), pos);
                     }
                     else {
-                        store.replayRemoveMessage(context, command.getMessageAck());
-                        transactionCounter++;
+                        if( store.replayRemoveMessage(context, command.getMessageAck()) ) {
+                        	redoCounter++;
+                        }
                     }
                 }
                 break;
@@ -465,8 +468,9 @@
                         transactionStore.acknowledge(store, command, pos);
                     }
                     else {
-                        store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
-                        transactionCounter++;
+                        if( store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId()) ) {
+                        	redoCounter++;
+                        }
                     }
                 }
                 break;
@@ -491,18 +495,20 @@
                             for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
                                 TxOperation op = (TxOperation) iter.next();
                                 if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
-                                    op.store.replayAddMessage(context, (Message)op.data, op.location);
+                                    if( op.store.replayAddMessage(context, (Message)op.data, op.location) )
+                                        redoCounter++;
                                 }
                                 if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
-                                    op.store.replayRemoveMessage(context, (MessageAck) op.data);
+                                    if( op.store.replayRemoveMessage(context, (MessageAck) op.data) )
+                                        redoCounter++;
                                 }
                                 if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
                                     JournalTopicAck ack = (JournalTopicAck) op.data;
-                                    ((QuickTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack
-                                            .getMessageId());
+                                    if( ((QuickTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId()) ) {
+                                        redoCounter++;
+                                    }
                                 }
                             }
-                            transactionCounter++;
                             break;
                         case JournalTransaction.LOCAL_ROLLBACK:
                         case JournalTransaction.XA_ROLLBACK:
@@ -524,11 +530,11 @@
                 }
             }
         }
-
         Location location = writeTraceMessage("RECOVERED "+new Date(), true);
         asyncDataManager.setMark(location, true);
+        long end = System.currentTimeMillis();
 
-        log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
+        log.info("Recovered " + redoCounter + " operations from redo log in "+((end-start)/1000.0f)+" seconds.");
     }
 
     private IOException createReadException(Location location, Exception e) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java?view=diff&rev=492517&r1=492516&r2=492517
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java Thu Jan  4 03:00:31 2007
@@ -120,16 +120,18 @@
         
     }
     
-    public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
+    public boolean replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
         try {
             SubscriptionInfo sub = topicReferenceStore.lookupSubscription(clientId, subscritionName);
             if( sub != null ) {
                 topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId);
+                return true;
             }
         }
         catch (Throwable e) {
             log.debug("Could not replay acknowledge for message '" + messageId + "'.  Message may have already been acknowledged. reason: " + e);
         }
+        return false;
     }