You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/08/31 00:53:45 UTC
[5/5] git commit: reflect need for store to set
messageId.setFutureOrSequenceLong for journaled jdbc
reflect need for store to set messageId.setFutureOrSequenceLong for journaled jdbc
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e8f81551
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e8f81551
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e8f81551
Branch: refs/heads/trunk
Commit: e8f8155141e75ba10c1ba03d9031b6128a735507
Parents: 8a37f97
Author: gtully <ga...@gmail.com>
Authored: Sat Aug 30 23:51:59 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Sat Aug 30 23:51:59 2014 +0100
----------------------------------------------------------------------
.../activemq/store/journal/JournalMessageStore.java | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/e8f81551/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
index 08276d3..2d44769 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
@@ -34,6 +34,7 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
+import org.apache.activemq.store.IndexListener;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
@@ -88,7 +89,7 @@ public class JournalMessageStore extends AbstractMessageStore {
* Not synchronized since the Journal has better throughput if you increase
* the number of concurrent writes that it is doing.
*/
- public void addMessage(ConnectionContext context, final Message message) throws IOException {
+ public void addMessage(final ConnectionContext context, final Message message) throws IOException {
final MessageId id = message.getMessageId();
@@ -100,7 +101,7 @@ public class JournalMessageStore extends AbstractMessageStore {
if (debug) {
LOG.debug("Journalled message add for: " + id + ", at: " + location);
}
- addMessage(message, location);
+ addMessage(context, message, location);
} else {
if (debug) {
LOG.debug("Journalled transacted message add for: " + id + ", at: " + location);
@@ -116,7 +117,7 @@ public class JournalMessageStore extends AbstractMessageStore {
}
synchronized (JournalMessageStore.this) {
inFlightTxLocations.remove(location);
- addMessage(message, location);
+ addMessage(context, message, location);
}
}
@@ -133,11 +134,15 @@ public class JournalMessageStore extends AbstractMessageStore {
}
}
- void addMessage(final Message message, final RecordLocation location) {
+ void addMessage(ConnectionContext context, final Message message, final RecordLocation location) {
synchronized (this) {
lastLocation = location;
MessageId id = message.getMessageId();
messages.put(id, message);
+ message.getMessageId().setFutureOrSequenceLong(0l);
+ if (indexListener != null) {
+ indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
+ }
}
}