You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/10/07 16:03:48 UTC
[2/3] git commit: fix deadlock that blocks remote broker start in
duplex case with durable sub recreation on restart - dynamicOnly=true works
around
fix deadlock that blocks remote broker start in duplex case with durable sub recreation on restart - dynamicOnly=true works around
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c1c82beb
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c1c82beb
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c1c82beb
Branch: refs/heads/trunk
Commit: c1c82beb2d6cea17eb5a8164a42dde1bba2ef77c
Parents: 57fc29b
Author: gtully <ga...@gmail.com>
Authored: Tue Oct 7 14:38:20 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Tue Oct 7 14:50:41 2014 +0100
----------------------------------------------------------------------
.../network/DemandForwardingBridgeSupport.java | 24 +-
...DurableSubscriberWithNetworkRestartTest.java | 231 +++++++++++++++++++
2 files changed, 244 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/c1c82beb/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 7d334ac..83eea31 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -427,6 +427,15 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
startRemoteBridge();
} catch (Throwable e) {
serviceRemoteException(e);
+ return;
+ }
+
+ try {
+ if (safeWaitUntilStarted()) {
+ setupStaticDestinations();
+ }
+ } catch (Throwable e) {
+ serviceLocalException(e);
}
}
@@ -509,14 +518,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
startedLatch.countDown();
localStartedLatch.countDown();
}
-
- if (!disposed.get()) {
- setupStaticDestinations();
- } else {
- LOG.warn("Network connection between {} and {} ({}) was interrupted during establishment.", new Object[]{
- localBroker, remoteBroker, remoteBrokerName
- });
- }
}
}
@@ -841,7 +842,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
-
+ LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error);
if (!disposed.get()) {
if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
// not a reason to terminate the bridge - temps can disappear with
@@ -1398,12 +1399,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
* Performs a timed wait on the started latch and then checks for disposed
* before performing another wait each time the the started wait times out.
*/
- protected void safeWaitUntilStarted() throws InterruptedException {
+ protected boolean safeWaitUntilStarted() throws InterruptedException {
while (!disposed.get()) {
if (startedLatch.await(1, TimeUnit.SECONDS)) {
- return;
+ break;
}
}
+ return !disposed.get();
}
protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
http://git-wip-us.apache.org/repos/asf/activemq/blob/c1c82beb/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkRestartTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkRestartTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkRestartTest.java
new file mode 100644
index 0000000..3799c6c
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkRestartTest.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.Set;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.Wait;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+import static org.junit.Assume.assumeNotNull;
+
+
+public class DurableSubscriberWithNetworkRestartTest extends JmsMultipleBrokersTestSupport {
+ private static final Log LOG = LogFactory.getLog(DurableSubscriberWithNetworkRestartTest.class);
+ private static final String HUB = "HubBroker";
+ private static final String SPOKE = "SpokeBroker";
+ protected static final int MESSAGE_COUNT = 10;
+ public boolean dynamicOnly = false;
+
+ public void testSendOnAReceiveOnBWithTransportDisconnectDynamicOnly() throws Exception {
+ dynamicOnly = true;
+ try {
+ testSendOnAReceiveOnBWithTransportDisconnect();
+ } finally {
+ dynamicOnly = false;
+ }
+ }
+
+ public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception {
+ bridge(SPOKE, HUB);
+ startAllBrokers();
+
+ verifyDuplexBridgeMbean();
+
+ // Setup connection
+ URI hubURI = brokers.get(HUB).broker.getTransportConnectors().get(0).getPublishableConnectURI();
+ URI spokeURI = brokers.get(SPOKE).broker.getTransportConnectors().get(0).getPublishableConnectURI();
+ ActiveMQConnectionFactory facHub = new ActiveMQConnectionFactory(hubURI);
+ ActiveMQConnectionFactory facSpoke = new ActiveMQConnectionFactory(spokeURI);
+ Connection conHub = facHub.createConnection();
+ Connection conSpoke = facSpoke.createConnection();
+ conHub.setClientID("clientHUB");
+ conSpoke.setClientID("clientSPOKE");
+ conHub.start();
+ conSpoke.start();
+ Session sesHub = conHub.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sesSpoke = conSpoke.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ActiveMQTopic topic = new ActiveMQTopic("TEST.FOO");
+ String consumerName = "consumerName";
+
+ // Setup consumers
+ MessageConsumer remoteConsumer = sesHub.createDurableSubscriber(topic, consumerName);
+ sleep(1000);
+ remoteConsumer.close();
+
+ // Setup producer
+ MessageProducer localProducer = sesSpoke.createProducer(topic);
+ localProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ final String payloadString = new String(new byte[10*1024]);
+ // Send messages
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message test = sesSpoke.createTextMessage("test-" + i);
+ test.setStringProperty("payload", payloadString);
+ localProducer.send(test);
+ }
+ localProducer.close();
+
+ final String options = "?persistent=true&useJmx=true&deleteAllMessagesOnStartup=false";
+ for (int i=0;i<2;i++) {
+ brokers.get(SPOKE).broker.stop();
+ sleep(1000);
+ createBroker(new URI("broker:(tcp://localhost:61616)/" + SPOKE + options));
+ bridge(SPOKE, HUB);
+ brokers.get(SPOKE).broker.start();
+ LOG.info("restarted spoke..:" + i);
+
+ assertTrue("got mbeans on restart", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return countMbeans( brokers.get(HUB).broker, "networkBridge", 20000) == (dynamicOnly ? 1 : 2);
+ }
+ }));
+ }
+ }
+
+ private void verifyDuplexBridgeMbean() throws Exception {
+ assertEquals(1, countMbeans( brokers.get(HUB).broker, "networkBridge", 5000));
+ }
+
+ private int countMbeans(BrokerService broker, String type, int timeout) throws Exception {
+ final long expiryTime = System.currentTimeMillis() + timeout;
+
+ if (!type.contains("=")) {
+ type = type + "=*";
+ }
+
+ final ObjectName beanName = new ObjectName("org.apache.activemq:type=Broker,brokerName="
+ + broker.getBrokerName() + "," + type +",*");
+ Set<ObjectName> mbeans = null;
+ int count = 0;
+ do {
+ if (timeout > 0) {
+ Thread.sleep(100);
+ }
+
+ mbeans = broker.getManagementContext().queryNames(beanName, null);
+ if (mbeans != null) {
+ count = mbeans.size();
+ LOG.info("Found: " + count + ", matching type: " +type);
+ for (ObjectName objectName : mbeans) {
+ LOG.info("" + objectName);
+ }
+ //} else {
+ //logAllMbeans(broker);
+ }
+ } while ((mbeans == null || mbeans.isEmpty()) && expiryTime > System.currentTimeMillis());
+
+ // If port 1099 is in use when the Broker starts, starting the jmx connector
+ // will fail. So, if we have no mbsc to query, skip the test.
+ if (timeout > 0) {
+ assumeNotNull(mbeans);
+ }
+
+ return count;
+
+ }
+
+ private void logAllMbeans(BrokerService broker) throws MalformedURLException {
+ try {
+ // trace all existing MBeans
+ Set<?> all = broker.getManagementContext().queryNames(null, null);
+ LOG.info("Total MBean count=" + all.size());
+ for (Object o : all) {
+ //ObjectInstance bean = (ObjectInstance)o;
+ LOG.info(o);
+ }
+ } catch (Exception ignored) {
+ LOG.warn("getMBeanServer ex: " + ignored);
+ }
+ }
+
+ public NetworkConnector bridge(String from, String to) throws Exception {
+ NetworkConnector networkConnector = bridgeBrokers(from, to, dynamicOnly, -1, true);
+ networkConnector.setSuppressDuplicateQueueSubscriptions(true);
+ networkConnector.setDecreaseNetworkConsumerPriority(true);
+ networkConnector.setConsumerTTL(1);
+ networkConnector.setDuplex(true);
+ return networkConnector;
+ }
+
+ @Override
+ protected void startAllBrokers() throws Exception {
+ // Ensure HUB is started first so bridge will be active from the get go
+ BrokerItem brokerItem = brokers.get(HUB);
+ brokerItem.broker.start();
+ brokerItem = brokers.get(SPOKE);
+ brokerItem.broker.start();
+ sleep(600);
+ }
+
+ public void setUp() throws Exception {
+ super.setAutoFail(false);
+ super.setUp();
+ createBrokers(true);
+ }
+
+ private void createBrokers(boolean del) throws Exception {
+ final String options = "?persistent=true&useJmx=true&deleteAllMessagesOnStartup=" + del;
+ createBroker(new URI("broker:(tcp://localhost:61617)/" + HUB + options));
+ createBroker(new URI("broker:(tcp://localhost:61616)/" + SPOKE + options));
+ }
+
+ protected void configureBroker(BrokerService broker) {
+ broker.setKeepDurableSubsActive(false);
+ broker.getManagementContext().setCreateConnector(false);
+ PolicyMap defaultPolcyMap = new PolicyMap();
+ PolicyEntry defaultPolicy = new PolicyEntry();
+ //defaultPolicy.setUseCache(false);
+ if (broker.getBrokerName().equals(HUB)) {
+ defaultPolicy.setStoreUsageHighWaterMark(2);
+ broker.getSystemUsage().getStoreUsage().setLimit(1*1024*1024);
+ }
+ defaultPolcyMap.setDefaultEntry(defaultPolicy);
+ broker.setDestinationPolicy(defaultPolcyMap);
+ broker.getSystemUsage().getMemoryUsage().setLimit(100*1024*1024);
+ }
+
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ private void sleep(int milliSecondTime) {
+ try {
+ Thread.sleep(milliSecondTime);
+ } catch (InterruptedException igonred) {
+ }
+ }
+}