You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/09/23 14:15:49 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5960 - rework fix to reset the next sequence so that the next ack position and message reference gets cleared up in normal operation

Repository: activemq
Updated Branches:
  refs/heads/master 86c826c46 -> fcabcd282


https://issues.apache.org/jira/browse/AMQ-5960 - rework fix to reset the next sequence so that the next ack position and message reference gets cleared up in normal operation


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fcabcd28
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fcabcd28
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fcabcd28

Branch: refs/heads/master
Commit: fcabcd282dc01c6faaff3c7627882f42a64543b4
Parents: 86c826c
Author: gtully <ga...@gmail.com>
Authored: Wed Sep 23 13:15:29 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Sep 23 13:15:29 2015 +0100

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  | 21 +++++++++-----------
 1 file changed, 9 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/fcabcd28/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 815b9df..3512190 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1319,7 +1319,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
 
         // Add the message.
         int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
-        long id = sd.orderIndex.getNextMessageId(priority);
+        long id = sd.orderIndex.getNextMessageId();
         Long previous = sd.locationIndex.put(tx, location, id);
         if (previous == null) {
             previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
@@ -1346,16 +1346,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             // added message. We don't want to assign it a new id as the other indexes would
             // be wrong..
             sd.locationIndex.put(tx, location, previous);
+            // ensure sequence is not broken
+            sd.orderIndex.revertNextMessageId();
             metadata.lastUpdate = location;
-            // remove ack positions
-            if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
-                Iterator<Entry<String, SequenceSet>> it = sd.ackPositions.iterator(tx);
-                while (it.hasNext()) {
-                    Entry<String, SequenceSet> entry = it.next();
-                    entry.getValue().remove(id);
-                }
-            }
-
         }
         // record this id in any event, initial send or recovery
         metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
@@ -1443,7 +1436,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 removeAckLocation(command, tx, sd, subscriptionKey, sequence);
                 metadata.lastUpdate = ackLocation;
             } else if (LOG.isDebugEnabled()) {
-                LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
+                LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
             }
 
         }
@@ -3183,10 +3176,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             deletes.add(iterator.next());
         }
 
-        long getNextMessageId(int priority) {
+        long getNextMessageId() {
             return nextMessageId++;
         }
 
+        void revertNextMessageId() {
+            nextMessageId--;
+        }
+
         MessageKeys get(Transaction tx, Long key) throws IOException {
             MessageKeys result = defaultPriorityIndex.get(tx, key);
             if (result == null) {