You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/09/30 21:35:29 UTC
svn commit: r1177797 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
test/java/org/apache/activemq/bugs/AMQ3014Test.java
Author: tabish
Date: Fri Sep 30 19:35:28 2011
New Revision: 1177797
URL: http://svn.apache.org/viewvc?rev=1177797&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3014
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1177797&r1=1177796&r2=1177797&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Sep 30 19:35:28 2011
@@ -81,8 +81,8 @@ import org.slf4j.LoggerFactory;
/**
* A useful base class for implementing demand forwarding bridges.
- *
- *
+ *
+ *
*/
public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
@@ -231,7 +231,7 @@ public abstract class DemandForwardingBr
} catch (IOException e) {
LOG.warn("Caught exception from remote start", e);
}
- } else {
+ } else {
LOG.warn ("Bridge was disposed before the start() method was fully executed.");
throw new TransportDisposedIOException();
}
@@ -339,6 +339,7 @@ public abstract class DemandForwardingBr
IntrospectionSupport.getProperties(configuration, props, null);
String str = MarshallingSupport.propertiesToString(props);
brokerInfo.setNetworkProperties(str);
+ localBrokerIdKnownLatch.await();
brokerInfo.setBrokerId(this.localBrokerId);
remoteBroker.oneway(brokerInfo);
}
@@ -487,7 +488,7 @@ public abstract class DemandForwardingBr
if (isDuplex()) {
if (command.isMessage()) {
ActiveMQMessage message = (ActiveMQMessage) command;
- if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
+ if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
|| AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
serviceRemoteConsumerAdvisory(message.getDataStructure());
} else {
@@ -708,7 +709,7 @@ public abstract class DemandForwardingBr
final MessageDispatch md = (MessageDispatch) command;
final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
-
+
if (suppressMessageDispatch(md, sub)) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + md.getMessage());
@@ -721,14 +722,14 @@ public abstract class DemandForwardingBr
}
return;
}
-
+
Message message = configureMessage(md);
if (LOG.isDebugEnabled()) {
LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ") " + message.getMessageId() + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
}
-
+
if (!message.isResponseRequired()) {
-
+
// If the message was originally sent using async
// send, we will preserve that QOS
// by bridging it using an async send (small chance
@@ -740,9 +741,9 @@ public abstract class DemandForwardingBr
} finally {
sub.decrementOutstandingResponses();
}
-
+
} else {
-
+
// The message was not sent using async send, so we
// should only ack the local
// broker when we get confirmation that the remote
@@ -757,7 +758,7 @@ public abstract class DemandForwardingBr
} else {
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
dequeueCounter.incrementAndGet();
- }
+ }
} catch (IOException e) {
serviceLocalException(e);
} finally {
@@ -765,9 +766,9 @@ public abstract class DemandForwardingBr
}
}
};
-
+
remoteBroker.asyncRequest(message, callback);
-
+
}
} else {
if (LOG.isDebugEnabled()) {
@@ -1042,7 +1043,7 @@ public abstract class DemandForwardingBr
}
List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
- Collection<Subscription> currentSubs =
+ Collection<Subscription> currentSubs =
getRegionSubscriptions(consumerInfo.getDestination().isTopic());
for (Subscription sub : currentSubs) {
List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
@@ -1070,7 +1071,7 @@ public abstract class DemandForwardingBr
if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName
- + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: "
+ + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: "
+ existingSub + ", networkConsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds());
}
suppress = true;
@@ -1082,7 +1083,7 @@ public abstract class DemandForwardingBr
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo()
+ " with sub from " + remoteBrokerName
- + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: "
+ + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: "
+ candidateInfo.getNetworkConsumerIds());
}
} catch (IOException e) {
@@ -1113,7 +1114,7 @@ public abstract class DemandForwardingBr
private final Collection<Subscription> getRegionSubscriptions(boolean isTopic) {
RegionBroker region = (RegionBroker) brokerService.getRegionBroker();
- AbstractRegion abstractRegion = (AbstractRegion)
+ AbstractRegion abstractRegion = (AbstractRegion)
(isTopic ? region.getTopicRegion() : region.getQueueRegion());
return abstractRegion.getSubscriptions().values();
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java?rev=1177797&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java Fri Sep 30 19:35:28 2011
@@ -0,0 +1,195 @@
+package org.apache.activemq.bugs;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.Connection;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This test involves the creation of a local and remote broker, both of which
+ * communicate over VM and TCP. The local broker establishes a bridge to the
+ * remote broker for the purposes of verifying that broker info is only
+ * transfered once the local broker's ID is known to the bridge support.
+ */
+public class AMQ3014Test {
+ // Change this URL to be an unused port.
+ private static final String REMOTE_BROKER_URL = "tcp://localhost:50000";
+
+ private List<BrokerInfo> remoteBrokerInfos = Collections
+ .synchronizedList(new ArrayList<BrokerInfo>());
+
+ private BrokerService localBroker = new BrokerService();
+
+ // Override the "remote" broker so that it records all (remote) BrokerInfos
+ // that it receives.
+ private BrokerService remoteBroker = new BrokerService() {
+ @Override
+ protected TransportConnector createTransportConnector(URI brokerURI)
+ throws Exception {
+ TransportServer transport = TransportFactory.bind(this, brokerURI);
+ return new TransportConnector(transport) {
+ @Override
+ protected Connection createConnection(Transport transport)
+ throws IOException {
+ Connection connection = super.createConnection(transport);
+ final TransportListener proxiedListener = transport
+ .getTransportListener();
+ transport.setTransportListener(new TransportListener() {
+
+ @Override
+ public void onCommand(Object command) {
+ if (command instanceof BrokerInfo) {
+ remoteBrokerInfos.add((BrokerInfo) command);
+ }
+ proxiedListener.onCommand(command);
+ }
+
+ @Override
+ public void onException(IOException error) {
+ proxiedListener.onException(error);
+ }
+
+ @Override
+ public void transportInterupted() {
+ proxiedListener.transportInterupted();
+ }
+
+ @Override
+ public void transportResumed() {
+ proxiedListener.transportResumed();
+ }
+ });
+ return connection;
+ }
+
+ };
+ }
+ };
+
+ @Before
+ public void init() throws Exception {
+ localBroker.setBrokerName("localBroker");
+ localBroker.setPersistent(false);
+ localBroker.setUseJmx(false);
+ localBroker.setSchedulerSupport(false);
+
+ remoteBroker.setBrokerName("remoteBroker");
+ remoteBroker.setPersistent(false);
+ remoteBroker.setUseJmx(false);
+ remoteBroker.addConnector(REMOTE_BROKER_URL);
+ remoteBroker.setSchedulerSupport(false);
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ try {
+ localBroker.stop();
+ } finally {
+ remoteBroker.stop();
+ }
+ }
+
+ /**
+ * This test verifies that the local broker's ID is typically known by the
+ * bridge support before the local broker's BrokerInfo is sent to the remote
+ * broker.
+ */
+ @Test
+ public void NormalCaseTest() throws Exception {
+ runTest(0, 3000);
+ }
+
+ /**
+ * This test verifies that timing can arise under which the local broker's
+ * ID is not known by the bridge support before the local broker's
+ * BrokerInfo is sent to the remote broker.
+ */
+ @Test
+ public void DelayedCaseTest() throws Exception {
+ runTest(500, 3000);
+ }
+
+ private void runTest(final long taskRunnerDelay, long timeout)
+ throws Exception {
+ // Add a network connector to the local broker that will create a bridge
+ // to the remote broker.
+ DiscoveryNetworkConnector dnc = new DiscoveryNetworkConnector();
+ SimpleDiscoveryAgent da = new SimpleDiscoveryAgent();
+ da.setServices(REMOTE_BROKER_URL);
+ dnc.setDiscoveryAgent(da);
+ localBroker.addNetworkConnector(dnc);
+
+ // Before starting the local broker, intercept the task runner factory
+ // so that the
+ // local VMTransport dispatcher is artificially delayed.
+ final TaskRunnerFactory realTaskRunnerFactory = localBroker
+ .getTaskRunnerFactory();
+ localBroker.setTaskRunnerFactory(new TaskRunnerFactory() {
+ public TaskRunner createTaskRunner(Task task, String name) {
+ final TaskRunner realTaskRunner = realTaskRunnerFactory
+ .createTaskRunner(task, name);
+ if (name.startsWith("ActiveMQ Connection Dispatcher: ")) {
+ return new TaskRunner() {
+ @Override
+ public void shutdown() throws InterruptedException {
+ realTaskRunner.shutdown();
+ }
+
+ @Override
+ public void shutdown(long timeout)
+ throws InterruptedException {
+ realTaskRunner.shutdown(timeout);
+ }
+
+ @Override
+ public void wakeup() throws InterruptedException {
+ Thread.sleep(taskRunnerDelay);
+ realTaskRunner.wakeup();
+ }
+ };
+ } else {
+ return realTaskRunnerFactory.createTaskRunner(task, name);
+ }
+ }
+ });
+
+ // Start the brokers and wait for the bridge to be created; the remote
+ // broker is started first to ensure it is available for the local
+ // broker to connect to.
+ remoteBroker.start();
+ localBroker.start();
+
+ // Wait for the remote broker to receive the local broker's BrokerInfo
+ // and then verify the local broker's ID is known.
+ long startTimeMillis = System.currentTimeMillis();
+ while (remoteBrokerInfos.isEmpty()
+ && (System.currentTimeMillis() - startTimeMillis) < timeout) {
+ Thread.sleep(100);
+ }
+
+ Assert.assertFalse("Timed out waiting for bridge to form.",
+ remoteBrokerInfos.isEmpty());
+ ;
+ Assert.assertNotNull("Local broker ID is null.", remoteBrokerInfos.get(
+ 0).getBrokerId());
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java
------------------------------------------------------------------------------
svn:eol-style = native