You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/08/31 00:53:45 UTC

[5/5] git commit: reflect need for store to set messageId.setFutureOrSequenceLong for journaled jdbc

reflect need for store to set messageId.setFutureOrSequenceLong for journaled jdbc


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e8f81551
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e8f81551
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e8f81551

Branch: refs/heads/trunk
Commit: e8f8155141e75ba10c1ba03d9031b6128a735507
Parents: 8a37f97
Author: gtully <ga...@gmail.com>
Authored: Sat Aug 30 23:51:59 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Sat Aug 30 23:51:59 2014 +0100

----------------------------------------------------------------------
 .../activemq/store/journal/JournalMessageStore.java    | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e8f81551/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
index 08276d3..2d44769 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
@@ -34,6 +34,7 @@ import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
+import org.apache.activemq.store.IndexListener;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -88,7 +89,7 @@ public class JournalMessageStore extends AbstractMessageStore {
      * Not synchronized since the Journal has better throughput if you increase
      * the number of concurrent writes that it is doing.
      */
-    public void addMessage(ConnectionContext context, final Message message) throws IOException {
+    public void addMessage(final ConnectionContext context, final Message message) throws IOException {
 
         final MessageId id = message.getMessageId();
 
@@ -100,7 +101,7 @@ public class JournalMessageStore extends AbstractMessageStore {
             if (debug) {
                 LOG.debug("Journalled message add for: " + id + ", at: " + location);
             }
-            addMessage(message, location);
+            addMessage(context, message, location);
         } else {
             if (debug) {
                 LOG.debug("Journalled transacted message add for: " + id + ", at: " + location);
@@ -116,7 +117,7 @@ public class JournalMessageStore extends AbstractMessageStore {
                     }
                     synchronized (JournalMessageStore.this) {
                         inFlightTxLocations.remove(location);
-                        addMessage(message, location);
+                        addMessage(context, message, location);
                     }
                 }
 
@@ -133,11 +134,15 @@ public class JournalMessageStore extends AbstractMessageStore {
         }
     }
 
-    void addMessage(final Message message, final RecordLocation location) {
+    void addMessage(ConnectionContext context, final Message message, final RecordLocation location) {
         synchronized (this) {
             lastLocation = location;
             MessageId id = message.getMessageId();
             messages.put(id, message);
+            message.getMessageId().setFutureOrSequenceLong(0l);
+            if (indexListener != null) {
+                indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
+            }
         }
     }