You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2017/03/29 22:51:08 UTC

[2/2] activemq git commit: [AMQ-6640] allign use of sync vm transport usage on duplex end of networkconnector with initiator end. only duplexinbound for forwarding is async to allow thread for responses. vm transport options applied in one place and test

[AMQ-6640] allign use of sync vm transport usage on duplex end of networkconnector with initiator end. only duplexinbound for forwarding is async to allow thread for responses. vm transport options applied in one place and test more deterministic w.r.t the hang scenario


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d84a5865
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d84a5865
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d84a5865

Branch: refs/heads/master
Commit: d84a58656c0b3fcb2aad04e47ef843bf379a25f0
Parents: 770a73e
Author: gtully <ga...@gmail.com>
Authored: Wed Mar 29 23:50:47 2017 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Mar 29 23:50:47 2017 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/BrokerService.java   |  8 +------
 .../activemq/broker/TransportConnection.java    |  2 +-
 .../network/DemandForwardingBridgeSupport.java  | 22 ++++++++++++--------
 .../activemq/network/NetworkBridgeFactory.java  | 13 +++++++++---
 .../activemq/network/NetworkConnector.java      |  2 +-
 .../activemq/network/NetworkRouteTest.java      |  6 +++---
 .../usecases/DuplexAdvisoryRaceTest.java        | 11 +++++-----
 .../src/test/resources/log4j.properties         |  1 +
 8 files changed, 35 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/d84a5865/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 8cd15d8..de70d29 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -2617,12 +2617,6 @@ public class BrokerService implements Service {
             setTransportConnectors(al);
         }
         this.slave = false;
-        URI uri = getVmConnectorURI();
-        Map<String, String> map = new HashMap<>(URISupport.parseParameters(uri));
-        map.put("async", "false");
-        map.put("create","false");
-        uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
-
         if (!stopped.get()) {
             ThreadPoolExecutor networkConnectorStartExecutor = null;
             if (isNetworkConnectorStartAsync()) {
@@ -2642,7 +2636,7 @@ public class BrokerService implements Service {
 
             for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
                 final NetworkConnector connector = iter.next();
-                connector.setLocalUri(uri);
+                connector.setLocalUri(getVmConnectorURI());
                 startNetworkConnector(connector, durableDestinations, networkConnectorStartExecutor);
             }
             if (networkConnectorStartExecutor != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/d84a5865/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index a58eda3..69d29bc 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -1428,7 +1428,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
                     }
                     setDuplexNetworkConnectorId(duplexNetworkConnectorId);
                 }
-                Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker);
+                Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker.getVmConnectorURI());
                 Transport remoteBridgeTransport = transport;
                 if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
                     // the vm transport case is already wrapped

http://git-wip-us.apache.org/repos/asf/activemq/blob/d84a5865/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 459501c..e343ad6 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -193,7 +193,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
             networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics());
 
             if (isDuplex()) {
-                duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker());
+                duplexInboundLocalBroker = NetworkBridgeFactory.createLocalAsyncTransport(brokerService.getBroker().getVmConnectorURI());
                 duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
 
                     @Override
@@ -830,9 +830,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         demandConsumerDispatched++;
         if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() *
                 (configuration.getAdvisoryAckPercentage() / 100f))) {
-            MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
+            final MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
             ack.setConsumerId(demandConsumerInfo.getConsumerId());
-            remoteBroker.oneway(ack);
+            brokerService.getTaskRunnerFactory().execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        remoteBroker.oneway(ack);
+                    } catch (IOException e) {
+                        LOG.warn("Failed to send advisory ack " + ack, e);
+                    }
+                }
+            });
             demandConsumerDispatched = 0;
         }
     }
@@ -1039,12 +1048,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
 
     protected void addSubscription(DemandSubscription sub) throws IOException {
         if (sub != null) {
-            if (isCreatedByDuplex() && !isDuplicateSuppressionOff(sub.getRemoteInfo())) {
-                // async vm transport on duplex end, need to wait for completion
-                localBroker.request(sub.getLocalInfo());
-            } else {
-                localBroker.oneway(sub.getLocalInfo());
-            }
+            localBroker.oneway(sub.getLocalInfo());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/d84a5865/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
index 0e938ae..32711a4 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
@@ -58,10 +58,17 @@ public final class NetworkBridgeFactory {
         return result;
     }
 
-    public static Transport createLocalTransport(Broker broker) throws Exception {
-        URI uri = broker.getVmConnectorURI();
+    public static Transport createLocalTransport(URI uri) throws Exception {
+        return createLocalTransport(uri, false);
+    }
+
+    public static Transport createLocalAsyncTransport(URI uri) throws Exception {
+        return createLocalTransport(uri, true);
+    }
+
+    private static Transport createLocalTransport(URI uri, boolean async) throws Exception {
         HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
-        map.put("async", "true");
+        map.put("async", String.valueOf(async));
         map.put("create", "false"); // we don't want a vm connect during shutdown to trigger a broker create
         uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
         return TransportFactory.connect(uri);

http://git-wip-us.apache.org/repos/asf/activemq/blob/d84a5865/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java
index 5faf94c..f943b82 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkConnector.java
@@ -140,7 +140,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
     }
 
     protected Transport createLocalTransport() throws Exception {
-        return TransportFactory.connect(localURI);
+        return NetworkBridgeFactory.createLocalTransport(localURI);
     }
 
     public static ActiveMQDestination[] getDurableTopicDestinations(final Set<ActiveMQDestination> durableDestinations) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/d84a5865/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkRouteTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkRouteTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkRouteTest.java
index 2b363b3..afa438e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkRouteTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkRouteTest.java
@@ -63,7 +63,7 @@ public class NetworkRouteTest {
 
     @Test
     public void verifyNoRemoveOnOneConduitRemove() throws Exception {
-        EasyMock.expect(localBroker.request(EasyMock.isA(ConsumerInfo.class))).andReturn(null);
+        localBroker.oneway(EasyMock.isA(ConsumerInfo.class));
         control.replay();
 
         remoteListener.onCommand(path2Msg);
@@ -76,7 +76,7 @@ public class NetworkRouteTest {
     @Test
     public void addAndRemoveOppositeOrder() throws Exception {
         // from (1)
-        localBroker.request(EasyMock.isA(ConsumerInfo.class));
+        localBroker.oneway(EasyMock.isA(ConsumerInfo.class));
         ArgHolder localConsumer = ArgHolder.holdArgsForLastObjectCall();
         // from (2a)
         remoteBroker.asyncRequest(EasyMock.isA(ActiveMQMessage.class), EasyMock.isA(ResponseCallback.class));
@@ -123,7 +123,7 @@ public class NetworkRouteTest {
     @Test
     public void addAndRemoveSameOrder() throws Exception {
         // from (1)
-        localBroker.request(EasyMock.isA(ConsumerInfo.class));
+        localBroker.oneway(EasyMock.isA(ConsumerInfo.class));
         ArgHolder localConsumer = ArgHolder.holdArgsForLastObjectCall();
 
         // from (2a)

http://git-wip-us.apache.org/repos/asf/activemq/blob/d84a5865/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java
index 9919ec9..b34b67e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DuplexAdvisoryRaceTest.java
@@ -126,9 +126,9 @@ public class DuplexAdvisoryRaceTest {
                 + "?jms.watchTopicAdvisories=false");
 
         // populate dests
-        final int numDests = 200;
-        final int numMessagesPerDest = 300;
-        final int numConsumersPerDest = 100;
+        final int numDests = 800;
+        final int numMessagesPerDest = 50;
+        final int numConsumersPerDest = 5;
         populate(brokerAFactory, 0, numDests/2, numMessagesPerDest);
         populate(brokerBFactory, numDests/2, numDests, numMessagesPerDest);
 
@@ -148,7 +148,7 @@ public class DuplexAdvisoryRaceTest {
                 LOG.info("received: " + responseReceived.get());
                 return responseReceived.get() >= numMessagesPerDest * numDests;
             }
-        }, 2*60*1000)) {
+        }, 5*60*1000)) {
 
            org.apache.activemq.TestSupport.dumpAllThreads("DD");
 
@@ -177,7 +177,6 @@ public class DuplexAdvisoryRaceTest {
         connection.start();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         final BytesMessage message = session.createBytesMessage();
-        //message.writeBytes(new byte[50]);
         MessageProducer producer = session.createProducer(null);;
         for (int i=minDest; i<maxDest; i++) {
             Destination destination = qFromInt(i);
@@ -236,7 +235,7 @@ public class DuplexAdvisoryRaceTest {
 
     protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker) throws Exception {
 
-        String uri = "static:(failover:(" + networkConnectorUrlString + "?socketBufferSize=1024)?maxReconnectAttempts=0)";
+        String uri = "static:(failover:(" + networkConnectorUrlString + "?socketBufferSize=1024&trace=false)?maxReconnectAttempts=0)";
 
         NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
         connector.setName(localBroker.getBrokerName() + "-to-" + remoteBroker.getBrokerName());

http://git-wip-us.apache.org/repos/asf/activemq/blob/d84a5865/activemq-unit-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/log4j.properties b/activemq-unit-tests/src/test/resources/log4j.properties
index 4704dbc..42d8c80 100644
--- a/activemq-unit-tests/src/test/resources/log4j.properties
+++ b/activemq-unit-tests/src/test/resources/log4j.properties
@@ -24,6 +24,7 @@ log4j.rootLogger=INFO, out, stdout
 #log4j.logger.org.apache.activemq.store.kahadb.scheduler=DEBUG
 #log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
 #log4j.logger.org.apache.activemq.transport.failover=TRACE
+#log4j.logger.org.apache.activemq.transport.TransportLogger.Connection=TRACE
 #log4j.logger.org.apache.activemq.store.jdbc=TRACE
 #log4j.logger.org.apache.activemq.store.kahadb=TRACE
 #log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG