You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/01/04 12:00:32 UTC
svn commit: r492517 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick:
QuickMessageStore.java QuickPersistenceAdapter.java
QuickTopicMessageStore.java
Author: chirino
Date: Thu Jan 4 03:00:31 2007
New Revision: 492517
URL: http://svn.apache.org/viewvc?view=rev&rev=492517
Log:
Quick Store bug fixes and added a bench mark test
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java?view=diff&rev=492517&r1=492516&r2=492517
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickMessageStore.java Thu Jan 4 03:00:31 2007
@@ -157,7 +157,7 @@
}
}
- public void replayAddMessage(ConnectionContext context, Message message, Location location) {
+ public boolean replayAddMessage(ConnectionContext context, Message message, Location location) {
MessageId id = message.getMessageId();
try {
// Only add the message if it has not already been added.
@@ -168,13 +168,13 @@
data.setFileId(location.getDataFileId());
data.setOffset(location.getOffset());
referenceStore.addMessageReference(context, id, data);
- System.out.println("referenceStore.put "+id+"-->"+data);
-
+ return true;
}
}
catch (Throwable e) {
log.warn("Could not replay add for message '" + id + "'. Message may have already been added. reason: " + e,e);
}
+ return false;
}
/**
@@ -238,17 +238,19 @@
}
}
- public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
+ public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
try {
// Only remove the message if it has not already been removed.
ReferenceData t = referenceStore.getMessageReference(messageAck.getLastMessageId());
if( t!=null ) {
referenceStore.removeMessage(context, messageAck);
+ return true;
}
}
catch (Throwable e) {
log.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'. Message may have already been acknowledged. reason: " + e);
}
+ return false;
}
/**
@@ -335,7 +337,6 @@
Entry<MessageId, ReferenceData> entry = iterator.next();
try {
referenceStore.addMessageReference(context, entry.getKey(), entry.getValue() );
- System.out.println("referenceStore.put "+entry.getKey()+"-->"+entry.getValue());
} catch (Throwable e) {
log.warn("Message could not be added to long term store: " + e.getMessage(), e);
}
@@ -398,7 +399,6 @@
if( data==null ) {
data = referenceStore.getMessageReference(identity);
- System.out.println("referenceStore.get "+identity+"-->"+data);
if( data==null ) {
return null;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java?view=diff&rev=492517&r1=492516&r2=492517
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickPersistenceAdapter.java Thu Jan 4 03:00:31 2007
@@ -422,9 +422,10 @@
private void recover() throws IllegalStateException, IOException {
Location pos = null;
- int transactionCounter = 0;
+ int redoCounter = 0;
log.info("Journal Recovery Started from: " + asyncDataManager);
+ long start = System.currentTimeMillis();
ConnectionContext context = new ConnectionContext();
// While we have records in the journal.
@@ -439,8 +440,9 @@
transactionStore.addMessage(store, message, pos);
}
else {
- store.replayAddMessage(context, message, pos);
- transactionCounter++;
+ if( store.replayAddMessage(context, message, pos) ) {
+ redoCounter++;
+ }
}
} else {
switch (c.getDataStructureType()) {
@@ -452,8 +454,9 @@
transactionStore.removeMessage(store, command.getMessageAck(), pos);
}
else {
- store.replayRemoveMessage(context, command.getMessageAck());
- transactionCounter++;
+ if( store.replayRemoveMessage(context, command.getMessageAck()) ) {
+ redoCounter++;
+ }
}
}
break;
@@ -465,8 +468,9 @@
transactionStore.acknowledge(store, command, pos);
}
else {
- store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
- transactionCounter++;
+ if( store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId()) ) {
+ redoCounter++;
+ }
}
}
break;
@@ -491,18 +495,20 @@
for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
TxOperation op = (TxOperation) iter.next();
if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
- op.store.replayAddMessage(context, (Message)op.data, op.location);
+ if( op.store.replayAddMessage(context, (Message)op.data, op.location) )
+ redoCounter++;
}
if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
- op.store.replayRemoveMessage(context, (MessageAck) op.data);
+ if( op.store.replayRemoveMessage(context, (MessageAck) op.data) )
+ redoCounter++;
}
if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
JournalTopicAck ack = (JournalTopicAck) op.data;
- ((QuickTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack
- .getMessageId());
+ if( ((QuickTopicMessageStore) op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId()) ) {
+ redoCounter++;
+ }
}
}
- transactionCounter++;
break;
case JournalTransaction.LOCAL_ROLLBACK:
case JournalTransaction.XA_ROLLBACK:
@@ -524,11 +530,11 @@
}
}
}
-
Location location = writeTraceMessage("RECOVERED "+new Date(), true);
asyncDataManager.setMark(location, true);
+ long end = System.currentTimeMillis();
- log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
+ log.info("Recovered " + redoCounter + " operations from redo log in "+((end-start)/1000.0f)+" seconds.");
}
private IOException createReadException(Location location, Exception e) {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java?view=diff&rev=492517&r1=492516&r2=492517
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/quick/QuickTopicMessageStore.java Thu Jan 4 03:00:31 2007
@@ -120,16 +120,18 @@
}
- public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
+ public boolean replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
try {
SubscriptionInfo sub = topicReferenceStore.lookupSubscription(clientId, subscritionName);
if( sub != null ) {
topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId);
+ return true;
}
}
catch (Throwable e) {
log.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e);
}
+ return false;
}