You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/09/11 18:08:06 UTC

[2/3] git commit: https://issues.apache.org/jira/browse/AMQ-5266 - fix edge case with optimizedDispatch=true where a single message could be pending till the next page in event

https://issues.apache.org/jira/browse/AMQ-5266 - fix edge case with optimizedDispatch=true where a single message could be pending till the next page in event


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5861d86a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5861d86a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5861d86a

Branch: refs/heads/trunk
Commit: 5861d86ad39cac1644b1a48157bd6c799a586ac4
Parents: 26807cd
Author: gtully <ga...@gmail.com>
Authored: Thu Sep 11 16:59:50 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Sep 11 16:59:50 2014 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |  6 ++--
 .../org/apache/activemq/bugs/AMQ5266Test.java   | 29 ++++++++++++--------
 2 files changed, 20 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/5861d86a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index ff16dfc..c7f768e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -781,12 +781,12 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
             sendLock.unlock();
         }
         for (MessageContext messageContext : orderedUpdates) {
-            if (!messageContext.duplicate) {
-                messageSent(messageContext.context, messageContext.message);
-            }
             if (messageContext.onCompletion != null) {
                 messageContext.onCompletion.run();
             }
+            if (!messageContext.duplicate) {
+                messageSent(messageContext.context, messageContext.message);
+            }
         }
         orderedUpdates.clear();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/5861d86a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
index 626fe6e..efccefa 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
@@ -87,22 +87,26 @@ public class AMQ5266Test {
     @Parameterized.Parameter(5)
     public boolean useDefaultStore = false;
 
-    @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5}")
+    @Parameterized.Parameter(6)
+    public boolean optimizeDispatch = false;
+
+    @Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},useDefaultStore:{5},optimizedDispatch:{6}")
     public static Iterable<Object[]> parameters() {
         return Arrays.asList(new Object[][]{
                 // jdbc
-                {1000, 20,  5,   50*1024,   true,  false},
-                {100,  20,  5,   50*1024,   false, false},
-                {1000, 5,   20,  50*1024,   true,  false},
-                {1000, 20,  20,  1024*1024, true,  false},
-                {1000, 100, 100, 1024*1024, true,  false},
+                {1,    1,   1,   50*1024,   false, false, true},
+                {1000, 20,  5,   50*1024,   true,  false, false},
+                {100,  20,  5,   50*1024,   false, false, false},
+                {1000, 5,   20,  50*1024,   true,  false, false},
+                {1000, 20,  20,  1024*1024, true,  false, false},
 
                 // default store
-                {1000, 20,  5,   50*1024,   true,  true},
-                {100,  20,  5,   50*1024,   false, true},
-                {1000, 5,   20,  50*1024,   true,  true},
-                {1000, 20,  20,  1024*1024, true,  true},
-                {1000, 100, 100, 1024*1024, true,  true}
+                {1,    1,   1,   50*1024,   false, true, true},
+                {100,  5,   5,   50*1024,   false, true, false},
+                {1000, 20,  5,   50*1024,   true,  true, false},
+                {100,  20,  5,   50*1024,   false, true, false},
+                {1000, 5,   20,  50*1024,   true,  true, false},
+                {1000, 20,  20,  1024*1024, true,  true, false},
         });
     }
 
@@ -127,6 +131,7 @@ public class AMQ5266Test {
             kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(true);
         }
         brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setUseJmx(false);
 
 
         PolicyMap policyMap = new PolicyMap();
@@ -136,7 +141,7 @@ public class AMQ5266Test {
         defaultEntry.setEnableAudit(true);
         defaultEntry.setUseCache(useCache);
         defaultEntry.setMaxPageSize(1000);
-        defaultEntry.setOptimizedDispatch(false);
+        defaultEntry.setOptimizedDispatch(optimizeDispatch);
         defaultEntry.setMemoryLimit(destMemoryLimit);
         defaultEntry.setExpireMessagesPeriod(0);
         policyMap.setDefaultEntry(defaultEntry);