You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2020/08/26 13:40:05 UTC

[activemq] branch master updated: AMQ-8023 - retain sync add call, duplicate sub suppression depends on it, regression in AMQ3274Test

This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new c0e6d47  AMQ-8023 - retain sync add call, duplicate sub suppression depends on it, regression in AMQ3274Test
c0e6d47 is described below

commit c0e6d47121c1fdf81ab86fb4272464219f947141
Author: gtully <ga...@gmail.com>
AuthorDate: Wed Aug 26 14:39:43 2020 +0100

    AMQ-8023 - retain sync add call, duplicate sub suppression depends on it, regression in AMQ3274Test
---
 .../activemq/network/DemandForwardingBridgeSupport.java     | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 2850029..6441c92 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -1151,7 +1152,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
     protected void addSubscription(final DemandSubscription sub) throws IOException {
         if (sub != null) {
             // Serialize with remove operations such that new sub does not cause remove/purge to fail
-            serialExecutor.execute(new Runnable() {
+            // remain synchronous b/c duplicate suppression depends on add completion
+            FutureTask syncTask = new FutureTask(new Runnable() {
                 @Override
                 public void run() {
                     try {
@@ -1161,7 +1163,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                         LOG.debug("detail", e);
                     }
                 }
-            });
+            }, null);
+            try {
+                serialExecutor.execute(syncTask);
+                syncTask.get();
+            } catch (Exception e) {
+                LOG.warn("failed to execute add sub command: {}, cause: {}", sub.getLocalInfo(), e);
+                LOG.debug("detail", e);
+            }
         }
     }