You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/02/02 14:02:49 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5542 fix (via revert below) and test case applied with thanks.

Repository: activemq
Updated Branches:
  refs/heads/trunk b35b8e37f -> b0a1bd833


https://issues.apache.org/jira/browse/AMQ-5542 fix (via revert below) and test case applied with thanks.

Revert "resolve https://issues.apache.org/activemq/browse/AMQ-2736, logic issue in code that keeps data files with acks around pending message file gc. thanks jgenender - test case to follow"

This reverts commit dd68c61e65f24b7dc498b36e34960a4bc46ded4b.

resolves: https://issues.apache.org/jira/browse/AMQ-5542 and applies test case that nicely demonstrates the defect, thanks


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b0a1bd83
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b0a1bd83
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b0a1bd83

Branch: refs/heads/trunk
Commit: b0a1bd833c2ea8e4ce2063636b0ee638f3f97f9b
Parents: b35b8e3
Author: gtully <ga...@gmail.com>
Authored: Fri Jan 30 14:45:26 2015 +0000
Committer: gtully <ga...@gmail.com>
Committed: Mon Feb 2 11:34:06 2015 +0000

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  3 +-
 .../org/apache/activemq/bugs/AMQ2832Test.java   | 51 ++++++++++++++++++++
 2 files changed, 52 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b0a1bd83/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 35a59ce..477f42c 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1672,14 +1672,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
             if (LOG.isTraceEnabled()) {
                 LOG.trace("gc candidates: " + gcCandidateSet);
             }
-            final TreeSet<Integer> gcCandidates = new TreeSet<Integer>(gcCandidateSet);
             Iterator<Integer> candidates = gcCandidateSet.iterator();
             while (candidates.hasNext()) {
                 Integer candidate = candidates.next();
                 Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate);
                 if (referencedFileIds != null) {
                     for (Integer referencedFileId : referencedFileIds) {
-                        if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) {
+                        if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) {
                             // active file that is not targeted for deletion is referenced so don't delete
                             candidates.remove();
                             break;

http://git-wip-us.apache.org/repos/asf/activemq/blob/b0a1bd83/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
index 319fcc2..22ad6ab 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.bugs;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -117,7 +118,57 @@ public class AMQ2832Test {
         }
     }
 
+   /**
+    * Scenario:
+    * db-1.log has an unacknowledged message,
+    * db-2.log contains acks for the messages from db-1.log,
+    * db-3.log contains acks for the messages from db-2.log
+    *
+    * Expected behavior: since db-1.log is blocked, db-2.log and db-3.log should not be removed during the cleanup.
+    * Current situation on 5.10.0, 5.10.1 is that db-3.log is removed causing all messages from db-2.log, whose acks were in db-3.log, to be replayed.
+    *
+    * @throws Exception
+    */
     @Test
+    public void testAckChain() throws Exception {
+       startBroker();
+
+       StagedConsumer consumer = new StagedConsumer();
+       // file #1
+       produceMessagesToConsumeMultipleDataFiles(5);
+       // acknowledge first 2 messages and leave the 3rd one unacknowledged blocking db-1.log
+       consumer.receive(3);
+
+       // send messages by consuming and acknowledging every message right after sent in order to get KahadbAdd and Remove command to be saved together
+       // this is necessary in order to get KahaAddMessageCommand to be saved in one db file and the corresponding KahaRemoveMessageCommand in the next one
+       produceAndConsumeImmediately(20, consumer);
+       consumer.receive(2).acknowledge(); // consume and ack the last 2 unconsumed
+
+       // now we have 3 files written and started with #4
+       consumer.close();
+
+       broker.stop();
+       broker.waitUntilStopped();
+
+       recoverBroker();
+
+       consumer = new StagedConsumer();
+       Message message = consumer.receive(1);
+       assertNotNull("One message stays unacked from db-1.log", message);
+       message.acknowledge();
+       message = consumer.receive(1);
+       assertNull("There should not be any unconsumed messages any more", message);
+       consumer.close();
+   }
+
+   private void produceAndConsumeImmediately(int numOfMsgs, StagedConsumer consumer) throws Exception {
+      for (int i = 0; i < numOfMsgs; i++) {
+         produceMessagesToConsumeMultipleDataFiles(1);
+         consumer.receive(1).acknowledge();
+      }
+   }
+
+   @Test
     public void testAckRemovedMessageReplayedAfterRecovery() throws Exception {
 
         startBroker();