You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2017/03/28 13:56:25 UTC

activemq git commit: [AMQ-6640] fix duplicate suppression sync request on responder end of duplex network connector only b/c that has the async local transport. Additional test. Ensure broker sync is conditional on the need for duplicate suppression whic

Repository: activemq
Updated Branches:
  refs/heads/master 0196be1d2 -> 8e00c6c2b


[AMQ-6640] fix duplicate suppression sync request on responder end of duplex network connector only b/c that has the async local transport. Additional test. Ensure broker sync is conditional on the need for duplicate suppression which should only be necessary in ring topologies when properly configured


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8e00c6c2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8e00c6c2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8e00c6c2

Branch: refs/heads/master
Commit: 8e00c6c2bc30e38cee585d9de97b511ed664951b
Parents: 0196be1
Author: gtully <ga...@gmail.com>
Authored: Tue Mar 28 14:49:23 2017 +0100
Committer: gtully <ga...@gmail.com>
Committed: Tue Mar 28 14:49:23 2017 +0100

----------------------------------------------------------------------
 .../network/DemandForwardingBridgeSupport.java  |  29 ++-
 .../org/apache/activemq/bugs/AMQ3274Test.java   |   4 +-
 .../usecases/DuplexAdvisoryRaceTest.java        | 247 +++++++++++++++++++
 3 files changed, 272 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8e00c6c2/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index f7dc745..459501c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -785,7 +785,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                                 case ConsumerInfo.DATA_STRUCTURE_TYPE:
                                     localStartedLatch.await();
                                     if (started.get()) {
-                                        addConsumerInfo((ConsumerInfo) command);
+                                        final ConsumerInfo consumerInfo = (ConsumerInfo) command;
+                                        if (isDuplicateSuppressionOff(consumerInfo)) {
+                                            addConsumerInfo(consumerInfo);
+                                        } else {
+                                            synchronized (brokerService.getVmConnectorURI()) {
+                                                addConsumerInfo(consumerInfo);
+                                            }
+                                        }
                                     } else {
                                         // received a subscription whilst stopping
                                         LOG.warn("Stopping - ignoring ConsumerInfo: {}", command);
@@ -867,8 +874,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
 
             // in a cyclic network there can be multiple bridges per broker that can propagate
             // a network subscription so there is a need to synchronize on a shared entity
-            synchronized (brokerService.getVmConnectorURI()) {
+            // if duplicate suppression is required
+            if (isDuplicateSuppressionOff(info)) {
                 addConsumerInfo(info);
+            } else {
+                synchronized (brokerService.getVmConnectorURI()) {
+                    addConsumerInfo(info);
+                }
             }
         } else if (data.getClass() == DestinationInfo.class) {
             // It's a destination info - we want to pass up information about temporary destinations
@@ -1027,8 +1039,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
 
     protected void addSubscription(DemandSubscription sub) throws IOException {
         if (sub != null) {
-            if (isDuplex()) {
-                // async vm transport, need to wait for completion
+            if (isCreatedByDuplex() && !isDuplicateSuppressionOff(sub.getRemoteInfo())) {
+                // async vm transport on duplex end, need to wait for completion
                 localBroker.request(sub.getLocalInfo());
             } else {
                 localBroker.oneway(sub.getLocalInfo());
@@ -1332,8 +1344,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
         boolean suppress = false;
 
-        if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic()
-                && !configuration.isSuppressDuplicateTopicSubscriptions()) {
+        if (isDuplicateSuppressionOff(consumerInfo)) {
             return suppress;
         }
 
@@ -1355,6 +1366,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         return suppress;
     }
 
+    private boolean isDuplicateSuppressionOff(final ConsumerInfo consumerInfo) {
+        return !configuration.isSuppressDuplicateQueueSubscriptions() && !configuration.isSuppressDuplicateTopicSubscriptions()
+                || consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions()
+                || consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions();
+    }
+
     private boolean isInActiveDurableSub(Subscription sub) {
         return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive());
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8e00c6c2/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
index 48c5cbb..f901d3d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
@@ -496,10 +496,10 @@ public class AMQ3274Test {
 
             if (queue_f) {
                 prefix = "queue";
-                excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE);
+                excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE);
             } else {
                 prefix = "topic";
-                excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE);
+                excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE);
             }
 
             excludes = new ArrayList<ActiveMQDestination>();

http://git-wip-us.apache.org/repos/asf/activemq/blob/8e00c6c2/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java
new file mode 100644
index 0000000..9919ec9
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.network.DemandForwardingBridge;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkBridge;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.TestUtils;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertTrue;
+
+// https://issues.apache.org/jira/browse/AMQ-6640
+public class DuplexAdvisoryRaceTest {
+    private static final Logger LOG = LoggerFactory.getLogger(DuplexAdvisoryRaceTest.class);
+    private static String hostName;
+
+    final AtomicLong responseReceived = new AtomicLong(0);
+
+    BrokerService brokerA,brokerB;
+    String networkConnectorUrlString;
+
+    @BeforeClass
+    public static void initIp() throws Exception {
+        // attempt to bypass loopback - not vital but it helps to reproduce
+        hostName = InetAddress.getLocalHost().getHostAddress();
+    }
+
+    @Before
+    public void createBrokers() throws Exception {
+        networkConnectorUrlString = "tcp://" + hostName + ":" + TestUtils.findOpenPort();
+
+        brokerA = newBroker("A");
+        brokerB = newBroker("B");
+        responseReceived.set(0);
+    }
+
+    @After
+    public void stopBrokers() throws Exception {
+        brokerA.stop();
+        brokerB.stop();
+    }
+
+
+    // to be sure to be sure
+    public void repeatTestHang() throws Exception {
+        for (int i=0; i<10;i++) {
+            testHang();
+            stopBrokers();
+            createBrokers();
+        }
+    }
+
+    @Test
+    public void testHang() throws Exception {
+
+        brokerA.setPlugins(new BrokerPlugin[] { new BrokerPluginSupport() {
+            @Override
+            public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
+                Subscription subscription = super.addConsumer(context, info);
+                // delay return to allow dispatch to interleave
+                if (context.isNetworkConnection()) {
+                    TimeUnit.MILLISECONDS.sleep(300);
+                }
+                return subscription;
+            };
+        }});
+
+        // bridge
+        NetworkConnector networkConnector = bridgeBrokers(brokerA, brokerB);
+
+        brokerA.start();
+        brokerB.start();
+
+        ActiveMQConnectionFactory brokerAFactory = new ActiveMQConnectionFactory(brokerA.getTransportConnectorByScheme("tcp").getPublishableConnectString()
+                + "?jms.watchTopicAdvisories=false");
+
+        ActiveMQConnectionFactory brokerBFactory = new ActiveMQConnectionFactory(brokerB.getTransportConnectorByScheme("tcp").getPublishableConnectString()
+                + "?jms.watchTopicAdvisories=false");
+
+        // populate dests
+        final int numDests = 200;
+        final int numMessagesPerDest = 300;
+        final int numConsumersPerDest = 100;
+        populate(brokerAFactory, 0, numDests/2, numMessagesPerDest);
+        populate(brokerBFactory, numDests/2, numDests, numMessagesPerDest);
+
+        // demand
+        List<Connection> connections = new LinkedList<>();
+        connections.add(demand(brokerBFactory, 0, numDests/2, numConsumersPerDest));
+        connections.add(demand(brokerAFactory, numDests/2, numDests, numConsumersPerDest));
+
+
+        LOG.info("Allow duplex bridge to connect....");
+        // allow bridge to start
+        brokerB.startTransportConnector(brokerB.addConnector(networkConnectorUrlString + "?transport.socketBufferSize=1024"));
+
+       if (!Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("received: " + responseReceived.get());
+                return responseReceived.get() >= numMessagesPerDest * numDests;
+            }
+        }, 2*60*1000)) {
+
+           org.apache.activemq.TestSupport.dumpAllThreads("DD");
+
+           // when hung close will also hang!
+           for (NetworkBridge networkBridge : networkConnector.activeBridges()) {
+               if (networkBridge instanceof DemandForwardingBridge) {
+                   DemandForwardingBridge demandForwardingBridge = (DemandForwardingBridge) networkBridge;
+                   Socket socket = demandForwardingBridge.getRemoteBroker().narrow(Socket.class);
+                   socket.close();
+               }
+           }
+       }
+
+        networkConnector.stop();
+        for (Connection connection: connections) {
+            try {
+                connection.close();
+            } catch (Exception ignored) {}
+        }
+        assertTrue("received all sent: " + responseReceived.get(), responseReceived.get() >= numMessagesPerDest * numDests);
+    }
+
+
+    private void populate(ActiveMQConnectionFactory factory, int minDest, int maxDest, int numMessages) throws JMSException {
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final BytesMessage message = session.createBytesMessage();
+        //message.writeBytes(new byte[50]);
+        MessageProducer producer = session.createProducer(null);;
+        for (int i=minDest; i<maxDest; i++) {
+            Destination destination = qFromInt(i);
+            for (int j=0; j<numMessages; j++) {
+                producer.send(destination, message);
+            }
+        }
+        connection.close();
+    }
+
+    private Connection demand(ActiveMQConnectionFactory factory, int minDest, int maxDest, int numConsumers) throws Exception {
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        for (int i=minDest; i<maxDest; i++) {
+            Destination destination = qFromInt(i);
+            for (int j=0; j<numConsumers; j++) {
+                session.createConsumer(destination).setMessageListener(new MessageListener() {
+                    @Override
+                    public void onMessage(Message message) {
+                        responseReceived.incrementAndGet();
+                    }
+                });
+            }
+        }
+        connection.start();
+        return connection;
+    }
+
+    private Destination qFromInt(int val) {
+        StringBuilder builder = new StringBuilder();
+        String digits = String.format("%03d", val);
+        for (int i=0; i<3; i++) {
+            builder.append(digits.charAt(i));
+            if (i < 2) {
+                builder.append('.');
+            }
+        }
+        return new ActiveMQQueue("Test." + builder.toString());
+    }
+
+    private BrokerService newBroker(String name) throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(false);
+        brokerService.setBrokerName(name);
+        brokerService.addConnector("tcp://" + hostName + ":0?transport.socketBufferSize=1024");
+
+        PolicyMap map = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setExpireMessagesPeriod(0);
+        map.setDefaultEntry(defaultEntry);
+        brokerService.setDestinationPolicy(map);
+        return brokerService;
+    }
+
+
+    protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception {
+
+        String uri = "static:(failover:(" + networkConnectorUrlString + "?socketBufferSize=1024)?maxReconnectAttempts=0)";
+
+        NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
+        connector.setName(localBroker.getBrokerName() + "-to-" + remoteBroker.getBrokerName());
+        connector.setDuplex(true);
+        localBroker.addNetworkConnector(connector);
+        return connector;
+    }
+}
\ No newline at end of file