You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/12/11 12:14:02 UTC

git commit: Fix for https://issues.apache.org/jira/browse/AMQ-4924

Updated Branches:
  refs/heads/trunk ff0dd5a91 -> 2bbfbcfb2


Fix for https://issues.apache.org/jira/browse/AMQ-4924


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2bbfbcfb
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2bbfbcfb
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2bbfbcfb

Branch: refs/heads/trunk
Commit: 2bbfbcfb29463f9723a0efe548d46af7ddb3fbb6
Parents: ff0dd5a
Author: rajdavies <ra...@gmail.com>
Authored: Wed Dec 11 11:13:23 2013 +0000
Committer: rajdavies <ra...@gmail.com>
Committed: Wed Dec 11 11:13:45 2013 +0000

----------------------------------------------------------------------
 .../network/DemandForwardingBridgeSupport.java  | 144 +++++---
 .../network/NetworkBridgeConfiguration.java     |   9 +
 .../CheckDuplicateMessagesOnDuplexTest.java     | 356 +++++++++++++++++++
 3 files changed, 452 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2bbfbcfb/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 1126d22..555f0ec 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -19,7 +19,11 @@ package org.apache.activemq.network;
 import java.io.IOException;
 import java.security.GeneralSecurityException;
 import java.security.cert.X509Certificate;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -32,7 +36,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.management.ObjectName;
-
 import org.apache.activemq.DestinationDoesNotExistException;
 import org.apache.activemq.Service;
 import org.apache.activemq.advisory.AdvisoryBroker;
@@ -96,14 +99,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
     protected ActiveMQDestination[] durableDestinations;
     protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
     protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
-    protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
+    protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
     protected final CountDownLatch startedLatch = new CountDownLatch(2);
     protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
     protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
     protected NetworkBridgeConfiguration configuration;
     protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
 
-    protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null };
+    protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null};
     protected BrokerId remoteBrokerId;
 
     final AtomicLong enqueueCounter = new AtomicLong();
@@ -251,7 +254,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                                 serialExecutor.shutdown();
                                 if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
                                     List<Runnable> pendingTasks = serialExecutor.shutdownNow();
-                                        LOG.info("pending tasks on stop {}", pendingTasks);
+                                    LOG.info("pending tasks on stop {}", pendingTasks);
                                 }
                                 localBroker.oneway(new ShutdownInfo());
                                 remoteBroker.oneway(new ShutdownInfo());
@@ -292,7 +295,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
             public void run() {
                 final String originalName = Thread.currentThread().getName();
                 Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " +
-                    "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
+                        "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
 
                 try {
                     // First we collect the info data from both the local and remote ends
@@ -335,11 +338,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                 LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{
                         configuration.getBrokerName(), remoteBrokerName, remoteBrokerId
                 });
-                 ServiceSupport.dispose(localBroker);
-                 ServiceSupport.dispose(remoteBroker);
-                 // the bridge is left in a bit of limbo, but it won't get retried
-                 // in this state.
-                 return;
+                ServiceSupport.dispose(localBroker);
+                ServiceSupport.dispose(remoteBroker);
+                // the bridge is left in a bit of limbo, but it won't get retried
+                // in this state.
+                return;
             }
 
             // Fill in the remote broker's information now.
@@ -431,7 +434,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                         ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
                         duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
                         duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_"
-                            + configuration.getBrokerName());
+                                + configuration.getBrokerName());
                         duplexLocalConnectionInfo.setUserName(configuration.getUserName());
                         duplexLocalConnectionInfo.setPassword(configuration.getPassword());
 
@@ -578,7 +581,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                         if (command.isMessage()) {
                             final ActiveMQMessage message = (ActiveMQMessage) command;
                             if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
-                                || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
+                                    || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
                                 serviceRemoteConsumerAdvisory(message.getDataStructure());
                                 ackAdvisory(message);
                             } else {
@@ -587,27 +590,35 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                                 }
                                 // message being forwarded - we need to
                                 // propagate the response to our local send
-                                message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
-                                if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
-                                    duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
-                                        final int correlationId = message.getCommandId();
-
-                                        @Override
-                                        public void onCompletion(FutureResponse resp) {
-                                            try {
-                                                Response reply = resp.getResult();
-                                                reply.setCorrelationId(correlationId);
-                                                remoteBroker.oneway(reply);
-                                            } catch (IOException error) {
-                                                LOG.error("Exception: {} on duplex forward of: {}", error, message);
-                                                serviceRemoteException(error);
+                                if (canDuplexDispatch(message)) {
+                                    message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
+                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
+                                        duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
+                                            final int correlationId = message.getCommandId();
+
+                                            @Override
+                                            public void onCompletion(FutureResponse resp) {
+                                                try {
+                                                    Response reply = resp.getResult();
+                                                    reply.setCorrelationId(correlationId);
+                                                    remoteBroker.oneway(reply);
+                                                } catch (IOException error) {
+                                                    LOG.error("Exception: {} on duplex forward of: {}", error, message);
+                                                    serviceRemoteException(error);
+                                                }
                                             }
-                                        }
-                                    });
+                                        });
+                                    } else {
+                                        duplexInboundLocalBroker.oneway(message);
+                                    }
+                                    serviceInboundMessage(message);
                                 } else {
-                                    duplexInboundLocalBroker.oneway(message);
+                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
+                                        Response reply = new Response();
+                                        reply.setCorrelationId(message.getCommandId());
+                                        remoteBroker.oneway(reply);
+                                    }
                                 }
-                                serviceInboundMessage(message);
                             }
                         } else {
                             switch (command.getDataStructureType()) {
@@ -817,7 +828,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                 return;
             }
 
-            LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{ localBroker, remoteBroker, error });
+            LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error});
             LOG.debug("The local Exception was: {}", error, error);
 
             brokerService.getTaskRunnerFactory().execute(new Runnable() {
@@ -844,7 +855,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                     ActiveMQMessage advisoryMessage = new ActiveMQMessage();
                     advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
                     advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null,
-                        advisoryMessage);
+                            advisoryMessage);
 
                 }
             } catch (Exception e) {
@@ -871,7 +882,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
 
     protected void removeSubscription(final DemandSubscription sub) throws IOException {
         if (sub != null) {
-            LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{ configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId() });
+            LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()});
 
             // ensure not available for conduit subs pending removal
             subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
@@ -1049,7 +1060,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
 
     protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
         if (brokerPath == null || brokerPath.length == 0) {
-            return new BrokerId[] { idToAppend };
+            return new BrokerId[]{idToAppend};
         }
         BrokerId rc[] = new BrokerId[brokerPath.length + 1];
         System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
@@ -1156,7 +1167,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         boolean suppress = false;
 
         if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic()
-            && !configuration.isSuppressDuplicateTopicSubscriptions()) {
+                && !configuration.isSuppressDuplicateTopicSubscriptions()) {
             return suppress;
         }
 
@@ -1276,7 +1287,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                 priority -= info.getBrokerPath().length + 1;
             }
             result.getLocalInfo().setPriority(priority);
-            LOG.debug("{} using priority: {} for subscription: {}", new Object[]{ configuration.getBrokerName(), priority, info });
+            LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info});
         }
         configureDemandSubscription(info, result);
         return result;
@@ -1288,7 +1299,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         info.setDestination(destination);
 
         // Indicate that this subscription is being made on behalf of the remote broker.
-        info.setBrokerPath(new BrokerId[] { remoteBrokerId });
+        info.setBrokerPath(new BrokerId[]{remoteBrokerId});
 
         // the remote info held by the DemandSubscription holds the original
         // consumerId, the local info get's overwritten
@@ -1352,8 +1363,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
     /**
      * Performs a timed wait on the started latch and then checks for disposed
      * before performing another wait each time the the started wait times out.
-     *
-     * @throws InterruptedException
      */
     protected void safeWaitUntilStarted() throws InterruptedException {
         while (!disposed.get()) {
@@ -1403,7 +1412,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
 
     /**
      * @param dynamicallyIncludedDestinations
-     *            The dynamicallyIncludedDestinations to set.
+     *         The dynamicallyIncludedDestinations to set.
      */
     public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
         this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
@@ -1417,8 +1426,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
     }
 
     /**
-     * @param excludedDestinations
-     *            The excludedDestinations to set.
+     * @param excludedDestinations The excludedDestinations to set.
      */
     public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
         this.excludedDestinations = excludedDestinations;
@@ -1432,8 +1440,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
     }
 
     /**
-     * @param staticallyIncludedDestinations
-     *            The staticallyIncludedDestinations to set.
+     * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
      */
     public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
         this.staticallyIncludedDestinations = staticallyIncludedDestinations;
@@ -1447,8 +1454,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
     }
 
     /**
-     * @param durableDestinations
-     *            The durableDestinations to set.
+     * @param durableDestinations The durableDestinations to set.
      */
     public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
         this.durableDestinations = durableDestinations;
@@ -1476,8 +1482,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
     }
 
     /**
-     * @param createdByDuplex
-     *            the createdByDuplex to set
+     * @param createdByDuplex the createdByDuplex to set
      */
     public void setCreatedByDuplex(boolean createdByDuplex) {
         this.createdByDuplex = createdByDuplex;
@@ -1500,7 +1505,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
 
     @Override
     public String getRemoteBrokerId() {
-        return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() ==null)? null : remoteBrokerInfo.getBrokerId().toString();
+        return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString();
     }
 
     @Override
@@ -1543,7 +1548,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         return mbeanObjectName;
     }
 
-    public void resetStats(){
+    public void resetStats() {
         enqueueCounter.set(0);
         dequeueCounter.set(0);
     }
@@ -1624,18 +1629,43 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         }
     }
 
-    protected void serviceOutbound(Message message){
+    protected void serviceOutbound(Message message) {
         NetworkBridgeListener l = this.networkBridgeListener;
-        if (l != null){
-            l.onOutboundMessage(this,message);
+        if (l != null) {
+            l.onOutboundMessage(this, message);
         }
     }
 
-    protected void serviceInboundMessage(Message message){
+    protected void serviceInboundMessage(Message message) {
         NetworkBridgeListener l = this.networkBridgeListener;
-        if (l != null){
-            l.onInboundMessage(this,message);
+        if (l != null) {
+            l.onInboundMessage(this, message);
+        }
+    }
+
+    protected boolean canDuplexDispatch(Message message) {
+        boolean result = true;
+        if (configuration.isCheckDuplicateMessagesOnDuplex()){
+            final long producerSequenceId = message.getMessageId().getProducerSequenceId();
+            //  messages are multiplexed on this producer so we need to query the persistenceAdapter
+            long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId());
+            if (producerSequenceId <= lastStoredForMessageProducer) {
+                result = false;
+                LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
+                        (LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer
+                });
+            }
+        }
+        return result;
+    }
+
+    protected long getStoredSequenceIdForMessage(MessageId messageId) {
+        try {
+            return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
+        } catch (IOException ignored) {
+            LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored);
         }
+        return -1;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/2bbfbcfb/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
index bfff94e..3a59f30 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
@@ -61,6 +61,7 @@ public class NetworkBridgeConfiguration {
     private boolean useBrokerNamesAsIdSeed = true;
     private boolean gcDestinationViews = true;
     private long gcSweepTime = 60 * 1000;
+    private boolean checkDuplicateMessagesOnDuplex = false;
 
     /**
      * @return the conduitSubscriptions
@@ -440,4 +441,12 @@ public class NetworkBridgeConfiguration {
         this.gcSweepTime = gcSweepTime;
     }
 
+    public boolean isCheckDuplicateMessagesOnDuplex() {
+        return checkDuplicateMessagesOnDuplex;
+    }
+
+    public void setCheckDuplicateMessagesOnDuplex(boolean checkDuplicateMessagesOnDuplex) {
+        this.checkDuplicateMessagesOnDuplex = checkDuplicateMessagesOnDuplex;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/2bbfbcfb/activemq-unit-tests/src/test/java/org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest.java
new file mode 100644
index 0000000..68681c6
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.net.ServerSocketFactory;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.nio.NIOTransport;
+import org.apache.activemq.transport.nio.NIOTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.wireformat.WireFormat;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+
+/**
+ *
+ * @author x22koe
+ */
+public class CheckDuplicateMessagesOnDuplexTest {
+
+    private static final Logger log = LoggerFactory.getLogger(CheckDuplicateMessagesOnDuplexTest.class);
+    private BrokerService localBroker;
+    private BrokerService remoteBroker;
+    private ActiveMQConnectionFactory localFactory;
+    private ActiveMQConnectionFactory remoteFactory;
+    private Session localSession;
+    private MessageConsumer consumer;
+    private Session remoteSession;
+    private MessageProducer producer;
+    private Connection remoteConnection;
+    private Connection localConnection;
+    private DebugTransportFilter debugTransportFilter;
+    private boolean useLevelDB = false;
+
+    public CheckDuplicateMessagesOnDuplexTest() {
+    }
+
+    @BeforeClass
+    public static void setUpClass() {
+    }
+
+    @AfterClass
+    public static void tearDownClass() {
+    }
+
+    @Before
+    public void setUp() {
+    }
+
+    @After
+    public void tearDown() {
+    }
+
+    @Test
+    public void testConnectionLossBehaviorBeforeAckIsSent() throws Exception {
+        createBrokers();
+        localBroker.deleteAllMessages();
+        remoteBroker.deleteAllMessages();
+        startBrokers();
+        openConnections();
+
+        Thread.sleep(1000);
+        log.info("\n\n==============================================\nsend hello1\n");
+
+        // simulate network failure between REMOTE and LOCAL just before the reception response is sent back to REMOTE
+        debugTransportFilter.closeOnResponse = true;
+
+        producer.send(remoteSession.createTextMessage("hello1"));
+        Message msg = consumer.receive(30000);
+
+        assertNotNull("expected hello1", msg);
+        assertEquals("hello1", ((TextMessage) msg).getText());
+
+        Thread.sleep(1000);
+        log.info("\n\n------------------------------------------\nsend hello2\n");
+
+        producer.send(remoteSession.createTextMessage("hello2"));
+        msg = consumer.receive(30000);
+
+        assertNotNull("expected hello2", msg);
+        assertEquals("hello2", ((TextMessage) msg).getText());
+
+        closeLocalConnection();
+
+        Thread.sleep(1000);
+        log.info("\n\n------------------------------------------\nsend hello3\n");
+
+        openLocalConnection();
+
+        Thread.sleep(1000);
+
+        producer.send(remoteSession.createTextMessage("hello3"));
+        msg = consumer.receive(30000);
+
+        assertNotNull("expected hello3", msg);
+        assertEquals("hello3", ((TextMessage) msg).getText());
+
+        Thread.sleep(1000);
+        log.info("\n\n==============================================\n\n");
+
+        closeConnections();
+        stopBrokers();
+
+        // restart the local broker, which should be empty
+
+        Thread.sleep(1000);
+        log.info("\n\n##############################################\n\n");
+
+        createLocalBroker();
+        startLocalBroker();
+        openLocalConnection();
+
+        // this should not return the "hello1" message
+        msg = consumer.receive(1000);
+
+        closeLocalConnection();
+        stopLocalBroker();
+
+        assertNull(msg);
+    }
+
+    private void createBrokers() throws Exception {
+        createLocalBroker();
+        createRemoteBroker();
+    }
+
+    private void createLocalBroker() throws Exception {
+        localBroker = new BrokerService();
+        localBroker.setBrokerName("LOCAL");
+        localBroker.setUseJmx(true);
+        localBroker.setSchedulePeriodForDestinationPurge(5000);
+        ManagementContext managementContext = new ManagementContext();
+        managementContext.setCreateConnector(false);
+        localBroker.setManagementContext(managementContext);
+        PersistenceAdapter persistenceAdapter = persistanceAdapterFactory("target/local");
+        localBroker.setPersistenceAdapter(persistenceAdapter);
+        List<TransportConnector> transportConnectors = new ArrayList<TransportConnector>();
+        DebugTransportFactory tf = new DebugTransportFactory();
+        TransportServer transport = tf.doBind(URI.create("nio://127.0.0.1:23539"));
+        TransportConnector transportConnector = new TransportConnector(transport);
+        transportConnector.setName("tc");
+        transportConnector.setAuditNetworkProducers(true);
+        transportConnectors.add(transportConnector);
+        localBroker.setTransportConnectors(transportConnectors);
+    }
+
+    private void createRemoteBroker() throws Exception {
+        remoteBroker = new BrokerService();
+        remoteBroker.setBrokerName("REMOTE");
+        remoteBroker.setUseJmx(true);
+        remoteBroker.setSchedulePeriodForDestinationPurge(5000);
+        ManagementContext managementContext = new ManagementContext();
+        managementContext.setCreateConnector(false);
+        remoteBroker.setManagementContext(managementContext);
+        PersistenceAdapter persistenceAdapter = persistanceAdapterFactory("target/remote");
+        remoteBroker.setPersistenceAdapter(persistenceAdapter);
+        List<NetworkConnector> networkConnectors = new ArrayList<NetworkConnector>();
+        DiscoveryNetworkConnector networkConnector = new DiscoveryNetworkConnector();
+        networkConnector.setName("to local");
+        // set maxInactivityDuration to 0, otherwise the broker restarts while you are in the debugger
+        networkConnector.setUri(URI.create("static://(tcp://127.0.0.1:23539?wireFormat.maxInactivityDuration=0)"));
+        networkConnector.setDuplex(true);
+        //networkConnector.setNetworkTTL(5);
+        //networkConnector.setDynamicOnly(true);
+        networkConnector.setAlwaysSyncSend(true);
+        networkConnector.setDecreaseNetworkConsumerPriority(false);
+        networkConnector.setPrefetchSize(1);
+        networkConnector.setCheckDuplicateMessagesOnDuplex(true);
+        networkConnectors.add(networkConnector);
+        remoteBroker.setNetworkConnectors(networkConnectors);
+    }
+
+    private void startBrokers() throws Exception {
+        startLocalBroker();
+        startRemoteBroker();
+    }
+
+    private void startLocalBroker() throws Exception {
+        localBroker.start();
+        localBroker.waitUntilStarted();
+    }
+
+    private void startRemoteBroker() throws Exception {
+        remoteBroker.start();
+        remoteBroker.waitUntilStarted();
+    }
+
+    private void openConnections() throws JMSException {
+        openLocalConnection();
+        openRemoteConnection();
+    }
+
+    private void openLocalConnection() throws JMSException {
+        localFactory = new ActiveMQConnectionFactory(localBroker.getVmConnectorURI());
+        //localFactory.setSendAcksAsync(false);
+        localConnection = localFactory.createConnection();
+        localConnection.start();
+        localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = localSession.createConsumer(localSession.createQueue("testqueue"));
+    }
+
+    private void openRemoteConnection() throws JMSException {
+        remoteFactory = new ActiveMQConnectionFactory(remoteBroker.getVmConnectorURI());
+        //remoteFactory.setSendAcksAsync(false);
+        remoteConnection = remoteFactory.createConnection();
+        remoteConnection.start();
+        remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        producer = remoteSession.createProducer(remoteSession.createQueue("testqueue"));
+    }
+
+    private void closeConnections() throws JMSException {
+        closeLocalConnection();
+        closeRemoteConnection();
+    }
+
+    private void closeLocalConnection() throws JMSException {
+        localConnection.close();
+    }
+
+    private void closeRemoteConnection() throws JMSException {
+        remoteConnection.close();
+    }
+
+    private void stopBrokers() throws Exception {
+        stopRemoteBroker();
+        stopLocalBroker();
+    }
+
+    private void stopLocalBroker() throws Exception {
+        localBroker.stop();
+        localBroker.waitUntilStopped();
+    }
+
+    private void stopRemoteBroker() throws Exception {
+        remoteBroker.stop();
+        remoteBroker.waitUntilStopped();
+    }
+
+    private PersistenceAdapter persistanceAdapterFactory(String path) {
+        if (useLevelDB) {
+            return persistanceAdapterFactory_LevelDB(path);
+        } else {
+            return persistanceAdapterFactory_KahaDB(path);
+        }
+    }
+
+    private PersistenceAdapter persistanceAdapterFactory_KahaDB(String path) {
+        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
+        kahaDBPersistenceAdapter.setDirectory(new File(path));
+        kahaDBPersistenceAdapter.setIgnoreMissingJournalfiles(true);
+        kahaDBPersistenceAdapter.setCheckForCorruptJournalFiles(true);
+        kahaDBPersistenceAdapter.setChecksumJournalFiles(true);
+        return kahaDBPersistenceAdapter;
+    }
+
+    private PersistenceAdapter persistanceAdapterFactory_LevelDB(String path) {
+        LevelDBPersistenceAdapter levelDBPersistenceAdapter = new LevelDBPersistenceAdapter();
+        levelDBPersistenceAdapter.setDirectory(new File(path));
+        return levelDBPersistenceAdapter;
+    }
+
+    private class DebugTransportFactory extends NIOTransportFactory {
+
+        @Override
+        protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory)
+                throws IOException, URISyntaxException {
+            return new DebugTransportServer(this, location, serverSocketFactory);
+        }
+    }
+
+    private class DebugTransportServer extends TcpTransportServer {
+
+        public DebugTransportServer(TcpTransportFactory transportFactory, URI location,
+                                    ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+            super(transportFactory, location, serverSocketFactory);
+        }
+
+        @Override
+        protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+            Transport transport;
+            transport = new NIOTransport(format, socket);
+            debugTransportFilter = new DebugTransportFilter(transport);
+            return debugTransportFilter;
+        }
+    }
+
+    private class DebugTransportFilter extends TransportFilter {
+
+        boolean closeOnResponse = false;
+
+        public DebugTransportFilter(Transport next) {
+            super(next);
+        }
+
+        @Override
+        public void oneway(Object command) throws IOException {
+            if (closeOnResponse && command instanceof Response) {
+                closeOnResponse = false;
+                log.warn("\n\nclosing connection before response is sent\n\n");
+                try {
+                    ((NIOTransport) next).stop();
+                } catch (Exception ex) {
+                    log.error("couldn't stop niotransport", ex);
+                }
+                // don't send response
+                return;
+            }
+            super.oneway(command);
+        }
+    }
+}
\ No newline at end of file