You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/12/11 12:14:02 UTC
git commit: Fix for https://issues.apache.org/jira/browse/AMQ-4924
Updated Branches:
refs/heads/trunk ff0dd5a91 -> 2bbfbcfb2
Fix for https://issues.apache.org/jira/browse/AMQ-4924
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2bbfbcfb
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2bbfbcfb
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2bbfbcfb
Branch: refs/heads/trunk
Commit: 2bbfbcfb29463f9723a0efe548d46af7ddb3fbb6
Parents: ff0dd5a
Author: rajdavies <ra...@gmail.com>
Authored: Wed Dec 11 11:13:23 2013 +0000
Committer: rajdavies <ra...@gmail.com>
Committed: Wed Dec 11 11:13:45 2013 +0000
----------------------------------------------------------------------
.../network/DemandForwardingBridgeSupport.java | 144 +++++---
.../network/NetworkBridgeConfiguration.java | 9 +
.../CheckDuplicateMessagesOnDuplexTest.java | 356 +++++++++++++++++++
3 files changed, 452 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/2bbfbcfb/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 1126d22..555f0ec 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -19,7 +19,11 @@ package org.apache.activemq.network;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.cert.X509Certificate;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -32,7 +36,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.ObjectName;
-
import org.apache.activemq.DestinationDoesNotExistException;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
@@ -96,14 +99,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected ActiveMQDestination[] durableDestinations;
protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
- protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
+ protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
protected final CountDownLatch startedLatch = new CountDownLatch(2);
protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
protected NetworkBridgeConfiguration configuration;
protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
- protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null };
+ protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null};
protected BrokerId remoteBrokerId;
final AtomicLong enqueueCounter = new AtomicLong();
@@ -251,7 +254,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
serialExecutor.shutdown();
if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
List<Runnable> pendingTasks = serialExecutor.shutdownNow();
- LOG.info("pending tasks on stop {}", pendingTasks);
+ LOG.info("pending tasks on stop {}", pendingTasks);
}
localBroker.oneway(new ShutdownInfo());
remoteBroker.oneway(new ShutdownInfo());
@@ -292,7 +295,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
public void run() {
final String originalName = Thread.currentThread().getName();
Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " +
- "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
+ "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
try {
// First we collect the info data from both the local and remote ends
@@ -335,11 +338,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{
configuration.getBrokerName(), remoteBrokerName, remoteBrokerId
});
- ServiceSupport.dispose(localBroker);
- ServiceSupport.dispose(remoteBroker);
- // the bridge is left in a bit of limbo, but it won't get retried
- // in this state.
- return;
+ ServiceSupport.dispose(localBroker);
+ ServiceSupport.dispose(remoteBroker);
+ // the bridge is left in a bit of limbo, but it won't get retried
+ // in this state.
+ return;
}
// Fill in the remote broker's information now.
@@ -431,7 +434,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_"
- + configuration.getBrokerName());
+ + configuration.getBrokerName());
duplexLocalConnectionInfo.setUserName(configuration.getUserName());
duplexLocalConnectionInfo.setPassword(configuration.getPassword());
@@ -578,7 +581,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (command.isMessage()) {
final ActiveMQMessage message = (ActiveMQMessage) command;
if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
- || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
+ || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
serviceRemoteConsumerAdvisory(message.getDataStructure());
ackAdvisory(message);
} else {
@@ -587,27 +590,35 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
// message being forwarded - we need to
// propagate the response to our local send
- message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
- if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
- duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
- final int correlationId = message.getCommandId();
-
- @Override
- public void onCompletion(FutureResponse resp) {
- try {
- Response reply = resp.getResult();
- reply.setCorrelationId(correlationId);
- remoteBroker.oneway(reply);
- } catch (IOException error) {
- LOG.error("Exception: {} on duplex forward of: {}", error, message);
- serviceRemoteException(error);
+ if (canDuplexDispatch(message)) {
+ message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
+ if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
+ duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
+ final int correlationId = message.getCommandId();
+
+ @Override
+ public void onCompletion(FutureResponse resp) {
+ try {
+ Response reply = resp.getResult();
+ reply.setCorrelationId(correlationId);
+ remoteBroker.oneway(reply);
+ } catch (IOException error) {
+ LOG.error("Exception: {} on duplex forward of: {}", error, message);
+ serviceRemoteException(error);
+ }
}
- }
- });
+ });
+ } else {
+ duplexInboundLocalBroker.oneway(message);
+ }
+ serviceInboundMessage(message);
} else {
- duplexInboundLocalBroker.oneway(message);
+ if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
+ Response reply = new Response();
+ reply.setCorrelationId(message.getCommandId());
+ remoteBroker.oneway(reply);
+ }
}
- serviceInboundMessage(message);
}
} else {
switch (command.getDataStructureType()) {
@@ -817,7 +828,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return;
}
- LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{ localBroker, remoteBroker, error });
+ LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error});
LOG.debug("The local Exception was: {}", error, error);
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@@ -844,7 +855,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null,
- advisoryMessage);
+ advisoryMessage);
}
} catch (Exception e) {
@@ -871,7 +882,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected void removeSubscription(final DemandSubscription sub) throws IOException {
if (sub != null) {
- LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{ configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId() });
+ LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()});
// ensure not available for conduit subs pending removal
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
@@ -1049,7 +1060,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
if (brokerPath == null || brokerPath.length == 0) {
- return new BrokerId[] { idToAppend };
+ return new BrokerId[]{idToAppend};
}
BrokerId rc[] = new BrokerId[brokerPath.length + 1];
System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
@@ -1156,7 +1167,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
boolean suppress = false;
if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic()
- && !configuration.isSuppressDuplicateTopicSubscriptions()) {
+ && !configuration.isSuppressDuplicateTopicSubscriptions()) {
return suppress;
}
@@ -1276,7 +1287,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
priority -= info.getBrokerPath().length + 1;
}
result.getLocalInfo().setPriority(priority);
- LOG.debug("{} using priority: {} for subscription: {}", new Object[]{ configuration.getBrokerName(), priority, info });
+ LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info});
}
configureDemandSubscription(info, result);
return result;
@@ -1288,7 +1299,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
info.setDestination(destination);
// Indicate that this subscription is being made on behalf of the remote broker.
- info.setBrokerPath(new BrokerId[] { remoteBrokerId });
+ info.setBrokerPath(new BrokerId[]{remoteBrokerId});
// the remote info held by the DemandSubscription holds the original
// consumerId, the local info get's overwritten
@@ -1352,8 +1363,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
/**
* Performs a timed wait on the started latch and then checks for disposed
* before performing another wait each time the the started wait times out.
- *
- * @throws InterruptedException
*/
protected void safeWaitUntilStarted() throws InterruptedException {
while (!disposed.get()) {
@@ -1403,7 +1412,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
/**
* @param dynamicallyIncludedDestinations
- * The dynamicallyIncludedDestinations to set.
+ * The dynamicallyIncludedDestinations to set.
*/
public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
@@ -1417,8 +1426,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
/**
- * @param excludedDestinations
- * The excludedDestinations to set.
+ * @param excludedDestinations The excludedDestinations to set.
*/
public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
this.excludedDestinations = excludedDestinations;
@@ -1432,8 +1440,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
/**
- * @param staticallyIncludedDestinations
- * The staticallyIncludedDestinations to set.
+ * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
*/
public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
this.staticallyIncludedDestinations = staticallyIncludedDestinations;
@@ -1447,8 +1454,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
/**
- * @param durableDestinations
- * The durableDestinations to set.
+ * @param durableDestinations The durableDestinations to set.
*/
public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
this.durableDestinations = durableDestinations;
@@ -1476,8 +1482,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
/**
- * @param createdByDuplex
- * the createdByDuplex to set
+ * @param createdByDuplex the createdByDuplex to set
*/
public void setCreatedByDuplex(boolean createdByDuplex) {
this.createdByDuplex = createdByDuplex;
@@ -1500,7 +1505,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
@Override
public String getRemoteBrokerId() {
- return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() ==null)? null : remoteBrokerInfo.getBrokerId().toString();
+ return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString();
}
@Override
@@ -1543,7 +1548,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return mbeanObjectName;
}
- public void resetStats(){
+ public void resetStats() {
enqueueCounter.set(0);
dequeueCounter.set(0);
}
@@ -1624,18 +1629,43 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
- protected void serviceOutbound(Message message){
+ protected void serviceOutbound(Message message) {
NetworkBridgeListener l = this.networkBridgeListener;
- if (l != null){
- l.onOutboundMessage(this,message);
+ if (l != null) {
+ l.onOutboundMessage(this, message);
}
}
- protected void serviceInboundMessage(Message message){
+ protected void serviceInboundMessage(Message message) {
NetworkBridgeListener l = this.networkBridgeListener;
- if (l != null){
- l.onInboundMessage(this,message);
+ if (l != null) {
+ l.onInboundMessage(this, message);
+ }
+ }
+
+ protected boolean canDuplexDispatch(Message message) {
+ boolean result = true;
+ if (configuration.isCheckDuplicateMessagesOnDuplex()){
+ final long producerSequenceId = message.getMessageId().getProducerSequenceId();
+ // messages are multiplexed on this producer so we need to query the persistenceAdapter
+ long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId());
+ if (producerSequenceId <= lastStoredForMessageProducer) {
+ result = false;
+ LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
+ (LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer
+ });
+ }
+ }
+ return result;
+ }
+
+ protected long getStoredSequenceIdForMessage(MessageId messageId) {
+ try {
+ return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
+ } catch (IOException ignored) {
+ LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored);
}
+ return -1;
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/2bbfbcfb/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
index bfff94e..3a59f30 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
@@ -61,6 +61,7 @@ public class NetworkBridgeConfiguration {
private boolean useBrokerNamesAsIdSeed = true;
private boolean gcDestinationViews = true;
private long gcSweepTime = 60 * 1000;
+ private boolean checkDuplicateMessagesOnDuplex = false;
/**
* @return the conduitSubscriptions
@@ -440,4 +441,12 @@ public class NetworkBridgeConfiguration {
this.gcSweepTime = gcSweepTime;
}
+ public boolean isCheckDuplicateMessagesOnDuplex() {
+ return checkDuplicateMessagesOnDuplex;
+ }
+
+ public void setCheckDuplicateMessagesOnDuplex(boolean checkDuplicateMessagesOnDuplex) {
+ this.checkDuplicateMessagesOnDuplex = checkDuplicateMessagesOnDuplex;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/2bbfbcfb/activemq-unit-tests/src/test/java/org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest.java
new file mode 100644
index 0000000..68681c6
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CheckDuplicateMessagesOnDuplexTest.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.net.ServerSocketFactory;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.nio.NIOTransport;
+import org.apache.activemq.transport.nio.NIOTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.wireformat.WireFormat;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+
+/**
+ *
+ * @author x22koe
+ */
+public class CheckDuplicateMessagesOnDuplexTest {
+
+ private static final Logger log = LoggerFactory.getLogger(CheckDuplicateMessagesOnDuplexTest.class);
+ private BrokerService localBroker;
+ private BrokerService remoteBroker;
+ private ActiveMQConnectionFactory localFactory;
+ private ActiveMQConnectionFactory remoteFactory;
+ private Session localSession;
+ private MessageConsumer consumer;
+ private Session remoteSession;
+ private MessageProducer producer;
+ private Connection remoteConnection;
+ private Connection localConnection;
+ private DebugTransportFilter debugTransportFilter;
+ private boolean useLevelDB = false;
+
+ public CheckDuplicateMessagesOnDuplexTest() {
+ }
+
+ @BeforeClass
+ public static void setUpClass() {
+ }
+
+ @AfterClass
+ public static void tearDownClass() {
+ }
+
+ @Before
+ public void setUp() {
+ }
+
+ @After
+ public void tearDown() {
+ }
+
+ @Test
+ public void testConnectionLossBehaviorBeforeAckIsSent() throws Exception {
+ createBrokers();
+ localBroker.deleteAllMessages();
+ remoteBroker.deleteAllMessages();
+ startBrokers();
+ openConnections();
+
+ Thread.sleep(1000);
+ log.info("\n\n==============================================\nsend hello1\n");
+
+ // simulate network failure between REMOTE and LOCAL just before the reception response is sent back to REMOTE
+ debugTransportFilter.closeOnResponse = true;
+
+ producer.send(remoteSession.createTextMessage("hello1"));
+ Message msg = consumer.receive(30000);
+
+ assertNotNull("expected hello1", msg);
+ assertEquals("hello1", ((TextMessage) msg).getText());
+
+ Thread.sleep(1000);
+ log.info("\n\n------------------------------------------\nsend hello2\n");
+
+ producer.send(remoteSession.createTextMessage("hello2"));
+ msg = consumer.receive(30000);
+
+ assertNotNull("expected hello2", msg);
+ assertEquals("hello2", ((TextMessage) msg).getText());
+
+ closeLocalConnection();
+
+ Thread.sleep(1000);
+ log.info("\n\n------------------------------------------\nsend hello3\n");
+
+ openLocalConnection();
+
+ Thread.sleep(1000);
+
+ producer.send(remoteSession.createTextMessage("hello3"));
+ msg = consumer.receive(30000);
+
+ assertNotNull("expected hello3", msg);
+ assertEquals("hello3", ((TextMessage) msg).getText());
+
+ Thread.sleep(1000);
+ log.info("\n\n==============================================\n\n");
+
+ closeConnections();
+ stopBrokers();
+
+ // restart the local broker, which should be empty
+
+ Thread.sleep(1000);
+ log.info("\n\n##############################################\n\n");
+
+ createLocalBroker();
+ startLocalBroker();
+ openLocalConnection();
+
+ // this should not return the "hello1" message
+ msg = consumer.receive(1000);
+
+ closeLocalConnection();
+ stopLocalBroker();
+
+ assertNull(msg);
+ }
+
+ private void createBrokers() throws Exception {
+ createLocalBroker();
+ createRemoteBroker();
+ }
+
+ private void createLocalBroker() throws Exception {
+ localBroker = new BrokerService();
+ localBroker.setBrokerName("LOCAL");
+ localBroker.setUseJmx(true);
+ localBroker.setSchedulePeriodForDestinationPurge(5000);
+ ManagementContext managementContext = new ManagementContext();
+ managementContext.setCreateConnector(false);
+ localBroker.setManagementContext(managementContext);
+ PersistenceAdapter persistenceAdapter = persistanceAdapterFactory("target/local");
+ localBroker.setPersistenceAdapter(persistenceAdapter);
+ List<TransportConnector> transportConnectors = new ArrayList<TransportConnector>();
+ DebugTransportFactory tf = new DebugTransportFactory();
+ TransportServer transport = tf.doBind(URI.create("nio://127.0.0.1:23539"));
+ TransportConnector transportConnector = new TransportConnector(transport);
+ transportConnector.setName("tc");
+ transportConnector.setAuditNetworkProducers(true);
+ transportConnectors.add(transportConnector);
+ localBroker.setTransportConnectors(transportConnectors);
+ }
+
+ private void createRemoteBroker() throws Exception {
+ remoteBroker = new BrokerService();
+ remoteBroker.setBrokerName("REMOTE");
+ remoteBroker.setUseJmx(true);
+ remoteBroker.setSchedulePeriodForDestinationPurge(5000);
+ ManagementContext managementContext = new ManagementContext();
+ managementContext.setCreateConnector(false);
+ remoteBroker.setManagementContext(managementContext);
+ PersistenceAdapter persistenceAdapter = persistanceAdapterFactory("target/remote");
+ remoteBroker.setPersistenceAdapter(persistenceAdapter);
+ List<NetworkConnector> networkConnectors = new ArrayList<NetworkConnector>();
+ DiscoveryNetworkConnector networkConnector = new DiscoveryNetworkConnector();
+ networkConnector.setName("to local");
+ // set maxInactivityDuration to 0, otherwise the broker restarts while you are in the debugger
+ networkConnector.setUri(URI.create("static://(tcp://127.0.0.1:23539?wireFormat.maxInactivityDuration=0)"));
+ networkConnector.setDuplex(true);
+ //networkConnector.setNetworkTTL(5);
+ //networkConnector.setDynamicOnly(true);
+ networkConnector.setAlwaysSyncSend(true);
+ networkConnector.setDecreaseNetworkConsumerPriority(false);
+ networkConnector.setPrefetchSize(1);
+ networkConnector.setCheckDuplicateMessagesOnDuplex(true);
+ networkConnectors.add(networkConnector);
+ remoteBroker.setNetworkConnectors(networkConnectors);
+ }
+
+ private void startBrokers() throws Exception {
+ startLocalBroker();
+ startRemoteBroker();
+ }
+
+ private void startLocalBroker() throws Exception {
+ localBroker.start();
+ localBroker.waitUntilStarted();
+ }
+
+ private void startRemoteBroker() throws Exception {
+ remoteBroker.start();
+ remoteBroker.waitUntilStarted();
+ }
+
+ private void openConnections() throws JMSException {
+ openLocalConnection();
+ openRemoteConnection();
+ }
+
+ private void openLocalConnection() throws JMSException {
+ localFactory = new ActiveMQConnectionFactory(localBroker.getVmConnectorURI());
+ //localFactory.setSendAcksAsync(false);
+ localConnection = localFactory.createConnection();
+ localConnection.start();
+ localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = localSession.createConsumer(localSession.createQueue("testqueue"));
+ }
+
+ private void openRemoteConnection() throws JMSException {
+ remoteFactory = new ActiveMQConnectionFactory(remoteBroker.getVmConnectorURI());
+ //remoteFactory.setSendAcksAsync(false);
+ remoteConnection = remoteFactory.createConnection();
+ remoteConnection.start();
+ remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producer = remoteSession.createProducer(remoteSession.createQueue("testqueue"));
+ }
+
+ private void closeConnections() throws JMSException {
+ closeLocalConnection();
+ closeRemoteConnection();
+ }
+
+ private void closeLocalConnection() throws JMSException {
+ localConnection.close();
+ }
+
+ private void closeRemoteConnection() throws JMSException {
+ remoteConnection.close();
+ }
+
+ private void stopBrokers() throws Exception {
+ stopRemoteBroker();
+ stopLocalBroker();
+ }
+
+ private void stopLocalBroker() throws Exception {
+ localBroker.stop();
+ localBroker.waitUntilStopped();
+ }
+
+ private void stopRemoteBroker() throws Exception {
+ remoteBroker.stop();
+ remoteBroker.waitUntilStopped();
+ }
+
+ private PersistenceAdapter persistanceAdapterFactory(String path) {
+ if (useLevelDB) {
+ return persistanceAdapterFactory_LevelDB(path);
+ } else {
+ return persistanceAdapterFactory_KahaDB(path);
+ }
+ }
+
+ private PersistenceAdapter persistanceAdapterFactory_KahaDB(String path) {
+ KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
+ kahaDBPersistenceAdapter.setDirectory(new File(path));
+ kahaDBPersistenceAdapter.setIgnoreMissingJournalfiles(true);
+ kahaDBPersistenceAdapter.setCheckForCorruptJournalFiles(true);
+ kahaDBPersistenceAdapter.setChecksumJournalFiles(true);
+ return kahaDBPersistenceAdapter;
+ }
+
+ private PersistenceAdapter persistanceAdapterFactory_LevelDB(String path) {
+ LevelDBPersistenceAdapter levelDBPersistenceAdapter = new LevelDBPersistenceAdapter();
+ levelDBPersistenceAdapter.setDirectory(new File(path));
+ return levelDBPersistenceAdapter;
+ }
+
+ private class DebugTransportFactory extends NIOTransportFactory {
+
+ @Override
+ protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory)
+ throws IOException, URISyntaxException {
+ return new DebugTransportServer(this, location, serverSocketFactory);
+ }
+ }
+
+ private class DebugTransportServer extends TcpTransportServer {
+
+ public DebugTransportServer(TcpTransportFactory transportFactory, URI location,
+ ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+ super(transportFactory, location, serverSocketFactory);
+ }
+
+ @Override
+ protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+ Transport transport;
+ transport = new NIOTransport(format, socket);
+ debugTransportFilter = new DebugTransportFilter(transport);
+ return debugTransportFilter;
+ }
+ }
+
+ private class DebugTransportFilter extends TransportFilter {
+
+ boolean closeOnResponse = false;
+
+ public DebugTransportFilter(Transport next) {
+ super(next);
+ }
+
+ @Override
+ public void oneway(Object command) throws IOException {
+ if (closeOnResponse && command instanceof Response) {
+ closeOnResponse = false;
+ log.warn("\n\nclosing connection before response is sent\n\n");
+ try {
+ ((NIOTransport) next).stop();
+ } catch (Exception ex) {
+ log.error("couldn't stop niotransport", ex);
+ }
+ // don't send response
+ return;
+ }
+ super.oneway(command);
+ }
+ }
+}
\ No newline at end of file