You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/05/18 10:37:54 UTC

activemq git commit: AMQ-6703 - fix regression in MBeanTest - StoreQueueCursor not sharing its audit - have purge use rollback and delegate to both cursors

Repository: activemq
Updated Branches:
  refs/heads/master 72613aaba -> 2eff835ee


AMQ-6703 - fix regression in MBeanTest - StoreQueueCursor not sharing its audit - have purge use rollback and delegate to both cursors


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2eff835e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2eff835e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2eff835e

Branch: refs/heads/master
Commit: 2eff835ee23138c31b01e2f560afe96e9094eb87
Parents: 72613aa
Author: gtully <ga...@gmail.com>
Authored: Fri May 18 11:36:54 2018 +0100
Committer: gtully <ga...@gmail.com>
Committed: Fri May 18 11:36:54 2018 +0100

----------------------------------------------------------------------
 .../java/org/apache/activemq/broker/region/Queue.java     |  5 +----
 .../activemq/broker/region/cursors/StoreQueueCursor.java  | 10 +++++++---
 2 files changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2eff835e/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 4c84713..2101523 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1307,6 +1307,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                     try {
                         QueueMessageReference r = (QueueMessageReference) ref;
                         removeMessage(c, r);
+                        messages.rollback(r.getMessageId());
                     } catch (IOException e) {
                     }
                 }
@@ -1314,10 +1315,6 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                 // store
             } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
 
-            if (getMessages().getMessageAudit() != null) {
-                getMessages().getMessageAudit().clear();
-            }
-
             if (this.destinationStatistics.getMessages().getCount() > 0) {
                 LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount, this.destinationStatistics.getMessages().getCount());
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/2eff835e/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
index 0f7216c..3cb1cc2 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
@@ -20,6 +20,7 @@ import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.usage.SystemUsage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,9 +78,6 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
     public synchronized void stop() throws Exception {
         started = false;
         if (nonPersistent != null) {
-//            nonPersistent.clear();
-//            nonPersistent.stop();
-//            nonPersistent.gc();
           nonPersistent.destroy();
         }
         persistent.stop();
@@ -265,6 +263,12 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
     }
 
     @Override
+    public void rollback(MessageId id) {
+        nonPersistent.rollback(id);
+        persistent.rollback(id);
+    }
+
+    @Override
     public void setUseCache(boolean useCache) {
         super.setUseCache(useCache);
         if (persistent != null) {