You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/05/12 21:11:46 UTC
activemq git commit: unit test to verify matching durable sub can
statically forward topic
Repository: activemq
Updated Branches:
refs/heads/master c8a6171d0 -> 3c0a4d960
unit test to verify matching durable sub can statically forward topic
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3c0a4d96
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3c0a4d96
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3c0a4d96
Branch: refs/heads/master
Commit: 3c0a4d960ee99483c9160d0b6b80c27abd8f3319
Parents: c8a6171
Author: gtully <ga...@gmail.com>
Authored: Thu May 12 22:11:25 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu May 12 22:11:25 2016 +0100
----------------------------------------------------------------------
...woBrokerDurableForwardStaticRestartTest.java | 147 +++++++++++++++++++
1 file changed, 147 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/3c0a4d96/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerDurableForwardStaticRestartTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerDurableForwardStaticRestartTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerDurableForwardStaticRestartTest.java
new file mode 100644
index 0000000..2255fce
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerDurableForwardStaticRestartTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+import java.io.File;
+import java.net.URI;
+import java.util.List;
+
+public class TwoBrokerDurableForwardStaticRestartTest extends JmsMultipleBrokersTestSupport {
+ protected static final Logger LOG = LoggerFactory.getLogger(TwoBrokerDurableForwardStaticRestartTest.class);
+ final ActiveMQTopic dest = new ActiveMQTopic("TEST.FOO");
+
+ public void testNonDurableReceiveThrougRestart() throws Exception {
+
+ bridgeBrokerPair("BrokerA", "BrokerB");
+ bridgeBrokerPair("BrokerB", "BrokerC");
+
+ registerDurableForwardSub("BrokerA", dest, "BrokerB");
+ registerDurableForwardSub("BrokerB", dest, "BrokerC");
+
+ startAllBrokers();
+ waitForBridgeFormation();
+
+ MessageConsumer clientC = createConsumer("BrokerC", dest);
+
+ // Send messages
+ sendMessages("BrokerA", dest, 100);
+
+ // Get message count
+ final MessageIdList messagesFromC = getConsumerMessages("BrokerC", clientC);
+
+ Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ return messagesFromC.getMessageCount() == 100;
+ }});
+
+ LOG.info("B got: " + messagesFromC.getMessageCount());
+
+ assertEquals(100, messagesFromC.getMessageCount());
+
+ destroyBroker("BrokerB");
+
+ // Send messages
+ sendMessages("BrokerA", dest, 100);
+
+ BrokerService broker = createBroker(new URI(
+ "broker:(tcp://0.0.0.0:61616)/BrokerB"));
+ bridgeBrokerPair("BrokerB", "BrokerC");
+ broker.start();
+
+ Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ return messagesFromC.getMessageCount() == 200;
+ }});
+
+ LOG.info("B got: " + messagesFromC.getMessageCount());
+
+ assertEquals(200, messagesFromC.getMessageCount());
+ }
+
+ @Override
+ protected void configureBroker(BrokerService broker) {
+ broker.getManagementContext().setCreateConnector(false);
+ broker.setAdvisorySupport(false);
+ }
+
+ private void registerDurableForwardSub(String brokerName, ActiveMQTopic dest, String remoteBrokerName) throws Exception {
+
+ // need to match the durable sub that would be created by the bridge in response to a remote durable sub advisory
+ String clientId = "NC_" + remoteBrokerName + "_inbound_" + brokerName;
+ String subName = "NC-DS_" + brokerName + "_" + dest.getPhysicalName();
+ BrokerItem brokerItem = brokers.get(brokerName);
+ //brokerItem.broker.getAdminView().createDurableSubscriber(clientId, subName, dest.getPhysicalName(), null);
+
+ Connection c = brokerItem.factory.createConnection();
+ c.setClientID(clientId);
+ Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber topicSubscriber = session.createDurableSubscriber(dest, subName);
+ topicSubscriber.close();
+ c.close();
+ }
+
+ protected NetworkConnector bridgeBrokerPair(String localBrokerName, String remoteBrokerName) throws Exception {
+ BrokerService localBroker = brokers.get(localBrokerName).broker;
+ BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
+
+ List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
+ URI remoteURI;
+ if (!transportConnectors.isEmpty()) {
+ remoteURI = transportConnectors.get(0).getConnectUri();
+ String uri = "static:(" + remoteURI + ")";
+ NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
+ connector.setDynamicOnly(false); // so matching durable subs are loaded on start
+ connector.setMessageTTL(2);
+ connector.setStaticBridge(true);
+ localBroker.addNetworkConnector(connector);
+ return connector;
+ } else {
+ throw new Exception("Remote broker has no registered connectors.");
+ }
+ }
+
+ public void setUp() throws Exception {
+ File dataDir = new File(IOHelper.getDefaultDataDirectory());
+ LOG.info("Delete dataDir.." + dataDir.getCanonicalPath());
+ org.apache.activemq.TestSupport.recursiveDelete(dataDir);
+ super.setAutoFail(true);
+ super.setUp();
+ createBroker(new URI(
+ "broker:(tcp://0.0.0.0:0)/BrokerA"));
+ createBroker(new URI(
+ "broker:(tcp://0.0.0.0:61616)/BrokerB"));
+ createBroker(new URI(
+ "broker:(tcp://0.0.0.0:0)/BrokerC"));
+
+ }
+}