You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/07/19 19:20:09 UTC
activemq git commit: Revert
"https://issues.apache.org/jira/browse/AMQ-6366"
Repository: activemq
Updated Branches:
refs/heads/activemq-5.13.x a82bd3cf7 -> 4fc16630e
Revert "https://issues.apache.org/jira/browse/AMQ-6366"
This reverts commit a82bd3cf721e32272b2cf3dee3aa1afcc726c3cb.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4fc16630
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4fc16630
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4fc16630
Branch: refs/heads/activemq-5.13.x
Commit: 4fc16630ea94a8eda96ffa99a8e559ca4355c02a
Parents: a82bd3c
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue Jul 19 15:19:58 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Jul 19 15:19:58 2016 -0400
----------------------------------------------------------------------
.../activemq/broker/TransportConnection.java | 14 +-
.../apache/activemq/usecases/AMQ6366Test.java | 141 -------------------
2 files changed, 4 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/4fc16630/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index b16383a..350f529 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -27,7 +27,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@@ -116,9 +115,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
// The broker and wireformat info that was exchanged.
protected BrokerInfo brokerInfo;
- protected final List<Command> dispatchQueue = new LinkedList<>();
+ protected final List<Command> dispatchQueue = new LinkedList<Command>();
protected TaskRunner taskRunner;
- protected final AtomicReference<Throwable> transportException = new AtomicReference<>();
+ protected final AtomicReference<Throwable> transportException = new AtomicReference<Throwable>();
protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
private final Transport transport;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
@@ -140,8 +139,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
private final AtomicBoolean stopping = new AtomicBoolean(false);
private final CountDownLatch stopped = new CountDownLatch(1);
private final AtomicBoolean asyncException = new AtomicBoolean(false);
- private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<>();
- private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<>();
+ private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
+ private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
private ConnectionContext context;
private boolean networkConnection;
@@ -1395,11 +1394,6 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
listener.setCreatedByDuplex(true);
duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
duplexBridge.setBrokerService(broker.getBrokerService());
- Set<ActiveMQDestination> durableDestinations = broker.getDurableDestinations();
- //Need to set durableDestinations to properly restart subs when dynamicOnly=false
- if (durableDestinations != null) {
- duplexBridge.setDurableDestinations(broker.getDurableDestinations().toArray(new ActiveMQDestination[0]));
- }
// now turn duplex off this side
info.setDuplexConnection(false);
duplexBridge.setCreatedByDuplex(true);
http://git-wip-us.apache.org/repos/asf/activemq/blob/4fc16630/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
deleted file mode 100644
index ec75232..0000000
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import java.io.File;
-import java.net.URI;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import javax.jms.MessageConsumer;
-
-import org.apache.activemq.JmsMultipleBrokersTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.DurableTopicSubscription;
-import org.apache.activemq.broker.region.Topic;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
-import org.apache.activemq.network.NetworkConnector;
-import org.apache.activemq.util.IOHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Show that both directions of a duplex bridge will properly restart the
- * network durable consumers if dynamicOnly is false.
- */
-public class AMQ6366Test extends JmsMultipleBrokersTestSupport {
- protected static final Logger LOG = LoggerFactory.getLogger(AMQ6366Test.class);
- final ActiveMQTopic dest = new ActiveMQTopic("TEST.FOO");
-
-
- /**
- * This test works even before AMQ6366
- * @throws Exception
- */
- public void testDuplexDurableSubRestarted() throws Exception {
- testNonDurableReceiveThrougRestart("BrokerA", "BrokerB");
- }
-
- /**
- * This test failed before AMQ6366 because the NC durable consumer was
- * never properly activated.
- *
- * @throws Exception
- */
- public void testDuplexDurableSubRestartedReverse() throws Exception {
- testNonDurableReceiveThrougRestart("BrokerB", "BrokerA");
- }
-
- protected void testNonDurableReceiveThrougRestart(String pubBroker, String conBroker) throws Exception {
- NetworkConnector networkConnector = bridgeBrokerPair("BrokerA", "BrokerB");
-
- startAllBrokers();
- waitForBridgeFormation();
-
- MessageConsumer client = createDurableSubscriber(conBroker, dest, "sub1");
- client.close();
-
- Thread.sleep(1000);
- networkConnector.stop();
- Thread.sleep(1000);
-
- Set<ActiveMQDestination> durableDests = new HashSet<>();
- durableDests.add(dest);
- //Normally set on broker start from the persistence layer but
- //simulate here since we just stopped and started the network connector
- //without a restart
- networkConnector.setDurableDestinations(durableDests);
- networkConnector.start();
- waitForBridgeFormation();
-
- // Send messages
- sendMessages(pubBroker, dest, 1);
- Thread.sleep(1000);
-
- Topic destination = (Topic) brokers.get(conBroker).broker.getDestination(dest);
- DurableTopicSubscription sub = destination.getDurableTopicSubs().
- values().toArray(new DurableTopicSubscription[0])[0];
-
- //Assert that the message made it to the other broker
- assertEquals(1, sub.getSubscriptionStatistics().getEnqueues().getCount());
- }
-
- @Override
- protected void configureBroker(BrokerService broker) {
- broker.getManagementContext().setCreateConnector(false);
- broker.setAdvisorySupport(true);
- }
-
- protected NetworkConnector bridgeBrokerPair(String localBrokerName, String remoteBrokerName) throws Exception {
- BrokerService localBroker = brokers.get(localBrokerName).broker;
- BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
-
- List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
- URI remoteURI;
- if (!transportConnectors.isEmpty()) {
- remoteURI = transportConnectors.get(0).getConnectUri();
- String uri = "static:(" + remoteURI + ")";
- NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
- connector.setDynamicOnly(false); // so matching durable subs are loaded on start
- connector.setStaticBridge(false);
- connector.setDuplex(true);
- connector.addDynamicallyIncludedDestination(dest);
- localBroker.addNetworkConnector(connector);
- return connector;
- } else {
- throw new Exception("Remote broker has no registered connectors.");
- }
- }
-
- @Override
- public void setUp() throws Exception {
- File dataDir = new File(IOHelper.getDefaultDataDirectory());
- LOG.info("Delete dataDir.." + dataDir.getCanonicalPath());
- org.apache.activemq.TestSupport.recursiveDelete(dataDir);
- super.setAutoFail(true);
- super.setUp();
- createBroker(new URI(
- "broker:(tcp://0.0.0.0:0)/BrokerA"));
- createBroker(new URI(
- "broker:(tcp://0.0.0.0:0)/BrokerB"));
-
- }
-}