You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/11/07 12:39:35 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-4852 - ensure clientId view connector mbean visible for duplex network connectors

Updated Branches:
  refs/heads/trunk ec5b15cc2 -> 50ec158e2


https://issues.apache.org/jira/browse/AMQ-4852 - ensure clientId view connector mbean visible for duplex network connectors


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/50ec158e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/50ec158e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/50ec158e

Branch: refs/heads/trunk
Commit: 50ec158e25d1ee81e788e29e15366e3955c8295f
Parents: ec5b15c
Author: gtully <ga...@gmail.com>
Authored: Thu Nov 7 11:34:56 2013 +0000
Committer: gtully <ga...@gmail.com>
Committed: Thu Nov 7 11:35:39 2013 +0000

----------------------------------------------------------------------
 .../network/DemandForwardingBridgeSupport.java  |  8 +++++
 .../broker/jmx/TransportConnectorMBeanTest.java | 37 ++++++++++++++++++++
 .../network/DuplexNetworkMBeanTest.java         |  4 +--
 3 files changed, 47 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/50ec158e/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index ea61472..af10a94 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -119,6 +119,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
 
     private final AtomicBoolean started = new AtomicBoolean();
     private TransportConnection duplexInitiatingConnection;
+    private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean();
     protected BrokerService brokerService = null;
     private ObjectName mbeanObjectName;
     private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
@@ -610,6 +611,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                         } else {
                             switch (command.getDataStructureType()) {
                                 case ConnectionInfo.DATA_STRUCTURE_TYPE:
+                                    if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) {
+                                        // end of initiating connection setup - propogate to initial connection to get mbean by clientid
+                                        duplexInitiatingConnection.processAddConnection((ConnectionInfo) command);
+                                    } else {
+                                        localBroker.oneway(command);
+                                    }
+                                    break;
                                 case SessionInfo.DATA_STRUCTURE_TYPE:
                                     localBroker.oneway(command);
                                     break;

http://git-wip-us.apache.org/repos/asf/activemq/blob/50ec158e/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java
index 310f112..6f55e3d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/TransportConnectorMBeanTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import static junit.framework.TestCase.assertTrue;
 import static org.junit.Assert.assertEquals;
 
 import java.net.Socket;
@@ -26,7 +27,9 @@ import javax.management.ObjectName;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.util.JMXSupport;
+import org.apache.activemq.util.Wait;
 import org.junit.After;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -47,6 +50,40 @@ public class TransportConnectorMBeanTest {
         doVerifyRemoteAddressInMbeanName(false);
     }
 
+    @Test
+    public void verifyClientIdNetwork() throws Exception {
+        doVerifyClientIdNetwork(false);
+    }
+
+    @Test
+    public void verifyClientIdDuplexNetwork() throws Exception {
+        doVerifyClientIdNetwork(true);
+    }
+
+    private void doVerifyClientIdNetwork(boolean duplex) throws Exception {
+        createBroker(true);
+
+        BrokerService networked = new BrokerService();
+        networked.setBrokerName("networked");
+        networked.setPersistent(false);
+        NetworkConnector nc = networked.addNetworkConnector("static:" + broker.getTransportConnectors().get(0).getPublishableConnectString());
+        nc.setDuplex(duplex);
+        networked.start();
+
+        try {
+            assertTrue("presence of mbean with clientId", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    Set<ObjectName> registeredMbeans = getRegisteredMbeans();
+                    return match("_outbound", registeredMbeans);
+                }
+            }));
+
+        } finally {
+            networked.stop();
+        }
+    }
+
     private void doVerifyRemoteAddressInMbeanName(boolean allowRemoteAddress) throws Exception {
         createBroker(allowRemoteAddress);
         ActiveMQConnection connection = createConnection();

http://git-wip-us.apache.org/repos/asf/activemq/blob/50ec158e/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
index cf02bb2..213d4ae 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
@@ -66,7 +66,7 @@ public class DuplexNetworkMBeanTest {
                     networkedBroker.start();
                     assertEquals(1, countMbeans(networkedBroker, "networkBridge", 2000));
                     assertEquals(1, countMbeans(broker, "networkBridge", 2000));
-                    assertEquals(1, countMbeans(broker, "connectionName"));
+                    assertEquals(2, countMbeans(broker, "connectionName"));
                 } finally {
                     networkedBroker.stop();
                     networkedBroker.waitUntilStopped();
@@ -100,7 +100,7 @@ public class DuplexNetworkMBeanTest {
                 try {
                     broker.start();
                     assertEquals(1, countMbeans(networkedBroker, "networkBridge", 5000));
-                    assertEquals("restart number: " + i, 1, countMbeans(broker, "connectionName", 10000));
+                    assertEquals("restart number: " + i, 2, countMbeans(broker, "connectionName", 10000));
                 } finally {
                     broker.stop();
                     broker.waitUntilStopped();