You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/07/19 19:20:09 UTC

activemq git commit: Revert "https://issues.apache.org/jira/browse/AMQ-6366"

Repository: activemq
Updated Branches:
  refs/heads/activemq-5.13.x a82bd3cf7 -> 4fc16630e


Revert "https://issues.apache.org/jira/browse/AMQ-6366"

This reverts commit a82bd3cf721e32272b2cf3dee3aa1afcc726c3cb.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4fc16630
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4fc16630
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4fc16630

Branch: refs/heads/activemq-5.13.x
Commit: 4fc16630ea94a8eda96ffa99a8e559ca4355c02a
Parents: a82bd3c
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue Jul 19 15:19:58 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Jul 19 15:19:58 2016 -0400

----------------------------------------------------------------------
 .../activemq/broker/TransportConnection.java    |  14 +-
 .../apache/activemq/usecases/AMQ6366Test.java   | 141 -------------------
 2 files changed, 4 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4fc16630/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index b16383a..350f529 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -27,7 +27,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
@@ -116,9 +115,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
     protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
     // The broker and wireformat info that was exchanged.
     protected BrokerInfo brokerInfo;
-    protected final List<Command> dispatchQueue = new LinkedList<>();
+    protected final List<Command> dispatchQueue = new LinkedList<Command>();
     protected TaskRunner taskRunner;
-    protected final AtomicReference<Throwable> transportException = new AtomicReference<>();
+    protected final AtomicReference<Throwable> transportException = new AtomicReference<Throwable>();
     protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
     private final Transport transport;
     private MessageAuthorizationPolicy messageAuthorizationPolicy;
@@ -140,8 +139,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
     private final AtomicBoolean stopping = new AtomicBoolean(false);
     private final CountDownLatch stopped = new CountDownLatch(1);
     private final AtomicBoolean asyncException = new AtomicBoolean(false);
-    private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<>();
-    private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<>();
+    private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
+    private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
     private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
     private ConnectionContext context;
     private boolean networkConnection;
@@ -1395,11 +1394,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
                 listener.setCreatedByDuplex(true);
                 duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
                 duplexBridge.setBrokerService(broker.getBrokerService());
-                Set<ActiveMQDestination> durableDestinations = broker.getDurableDestinations();
-                //Need to set durableDestinations to properly restart subs when dynamicOnly=false
-                if (durableDestinations != null) {
-                    duplexBridge.setDurableDestinations(broker.getDurableDestinations().toArray(new ActiveMQDestination[0]));
-                }
                 // now turn duplex off this side
                 info.setDuplexConnection(false);
                 duplexBridge.setCreatedByDuplex(true);

http://git-wip-us.apache.org/repos/asf/activemq/blob/4fc16630/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
deleted file mode 100644
index ec75232..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import java.io.File;
-import java.net.URI;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import javax.jms.MessageConsumer;
-
-import org.apache.activemq.JmsMultipleBrokersTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.DurableTopicSubscription;
-import org.apache.activemq.broker.region.Topic;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
-import org.apache.activemq.network.NetworkConnector;
-import org.apache.activemq.util.IOHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Show that both directions of a duplex bridge will properly restart the
- * network durable consumers if dynamicOnly is false.
- */
-public class AMQ6366Test extends JmsMultipleBrokersTestSupport {
-    protected static final Logger LOG = LoggerFactory.getLogger(AMQ6366Test.class);
-    final ActiveMQTopic dest = new ActiveMQTopic("TEST.FOO");
-
-
-    /**
-     * This test works even before AMQ6366
-     * @throws Exception
-     */
-    public void testDuplexDurableSubRestarted() throws Exception {
-        testNonDurableReceiveThrougRestart("BrokerA", "BrokerB");
-    }
-
-    /**
-     * This test failed before AMQ6366 because the NC durable consumer was
-     * never properly activated.
-     *
-     * @throws Exception
-     */
-    public void testDuplexDurableSubRestartedReverse() throws Exception {
-        testNonDurableReceiveThrougRestart("BrokerB", "BrokerA");
-    }
-
-    protected void testNonDurableReceiveThrougRestart(String pubBroker, String conBroker) throws Exception {
-        NetworkConnector networkConnector = bridgeBrokerPair("BrokerA", "BrokerB");
-
-        startAllBrokers();
-        waitForBridgeFormation();
-
-        MessageConsumer client = createDurableSubscriber(conBroker, dest, "sub1");
-        client.close();
-
-        Thread.sleep(1000);
-        networkConnector.stop();
-        Thread.sleep(1000);
-
-        Set<ActiveMQDestination> durableDests = new HashSet<>();
-        durableDests.add(dest);
-        //Normally set on broker start from the persistence layer but
-        //simulate here since we just stopped and started the network connector
-        //without a restart
-        networkConnector.setDurableDestinations(durableDests);
-        networkConnector.start();
-        waitForBridgeFormation();
-
-        // Send messages
-        sendMessages(pubBroker, dest, 1);
-        Thread.sleep(1000);
-
-        Topic destination = (Topic) brokers.get(conBroker).broker.getDestination(dest);
-        DurableTopicSubscription sub = destination.getDurableTopicSubs().
-                values().toArray(new DurableTopicSubscription[0])[0];
-
-        //Assert that the message made it to the other broker
-        assertEquals(1, sub.getSubscriptionStatistics().getEnqueues().getCount());
-    }
-
-    @Override
-    protected void configureBroker(BrokerService broker) {
-        broker.getManagementContext().setCreateConnector(false);
-        broker.setAdvisorySupport(true);
-    }
-
-    protected NetworkConnector bridgeBrokerPair(String localBrokerName, String remoteBrokerName) throws Exception {
-        BrokerService localBroker = brokers.get(localBrokerName).broker;
-        BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
-
-        List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
-        URI remoteURI;
-        if (!transportConnectors.isEmpty()) {
-            remoteURI = transportConnectors.get(0).getConnectUri();
-            String uri = "static:(" + remoteURI + ")";
-            NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
-            connector.setDynamicOnly(false); // so matching durable subs are loaded on start
-            connector.setStaticBridge(false);
-            connector.setDuplex(true);
-            connector.addDynamicallyIncludedDestination(dest);
-            localBroker.addNetworkConnector(connector);
-            return connector;
-        } else {
-            throw new Exception("Remote broker has no registered connectors.");
-        }
-    }
-
-    @Override
-    public void setUp() throws Exception {
-        File dataDir = new File(IOHelper.getDefaultDataDirectory());
-        LOG.info("Delete dataDir.." + dataDir.getCanonicalPath());
-        org.apache.activemq.TestSupport.recursiveDelete(dataDir);
-        super.setAutoFail(true);
-        super.setUp();
-        createBroker(new URI(
-                "broker:(tcp://0.0.0.0:0)/BrokerA"));
-        createBroker(new URI(
-                "broker:(tcp://0.0.0.0:0)/BrokerB"));
-
-    }
-}