You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/01/11 14:00:22 UTC

[1/2] activemq git commit: Network of brokers on duplex mode reports InstanceAlreadyExistsException on already existing destinations https://issues.apache.org/jira/browse/AMQ-6052 closes #164

Repository: activemq
Updated Branches:
  refs/heads/master 8f6baf818 -> 3bb7e4a5c


Network of brokers on duplex mode reports InstanceAlreadyExistsException on already existing destinations
https://issues.apache.org/jira/browse/AMQ-6052
closes #164


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3bb7e4a5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3bb7e4a5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3bb7e4a5

Branch: refs/heads/master
Commit: 3bb7e4a5c8497c8259a267e2eb11dddb8615a1ea
Parents: 6b1e874
Author: Altaflux <pa...@gmail.com>
Authored: Mon Jan 4 15:03:26 2016 -0600
Committer: gtully <ga...@gmail.com>
Committed: Mon Jan 11 12:59:33 2016 +0000

----------------------------------------------------------------------
 .../org/apache/activemq/network/MBeanBridgeDestination.java    | 6 +++---
 .../org/apache/activemq/network/DuplexNetworkMBeanTest.java    | 6 +++---
 2 files changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/3bb7e4a5/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
index 888d295..479e382 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

http://git-wip-us.apache.org/repos/asf/activemq/blob/3bb7e4a5/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
index 1efb393..3736f7d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.


[2/2] activemq git commit: Network of brokers on duplex mode reports InstanceAlreadyExistsException on already existing destinations

Posted by gt...@apache.org.
Network of brokers on duplex mode reports InstanceAlreadyExistsException on already existing destinations


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6b1e8741
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6b1e8741
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6b1e8741

Branch: refs/heads/master
Commit: 6b1e87410da4a2033c286fcaa758371e48da62ec
Parents: 8f6baf8
Author: Altaflux <pa...@gmail.com>
Authored: Mon Jan 4 15:00:00 2016 -0600
Committer: gtully <ga...@gmail.com>
Committed: Mon Jan 11 12:59:33 2016 +0000

----------------------------------------------------------------------
 .../network/MBeanBridgeDestination.java         | 144 ++++++++++---------
 .../network/DuplexNetworkMBeanTest.java         | 100 +++++++++++--
 2 files changed, 162 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/6b1e8741/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
index bab5574..888d295 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,12 +16,6 @@
  */
 package org.apache.activemq.network;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.management.ObjectName;
-
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.AnnotatedMBean;
 import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
@@ -33,6 +27,11 @@ import org.apache.activemq.thread.Scheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.management.ObjectName;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 public class MBeanBridgeDestination {
     private static final Logger LOG = LoggerFactory.getLogger(MBeanBridgeDestination.class);
     private final BrokerService brokerService;
@@ -41,9 +40,8 @@ public class MBeanBridgeDestination {
     private final NetworkBridgeConfiguration networkBridgeConfiguration;
     private final Scheduler scheduler;
     private final Runnable purgeInactiveDestinationViewTask;
-    private Map<ActiveMQDestination, ObjectName> destinationObjectNameMap = new ConcurrentHashMap<ActiveMQDestination, ObjectName>();
-    private Map<ActiveMQDestination, NetworkDestinationView> outboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
-    private Map<ActiveMQDestination, NetworkDestinationView> inboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
+    private final Map<ActiveMQDestination, NetworkDestinationContainer> outboundDestinationViewMap = new ConcurrentHashMap<>();
+    private final Map<ActiveMQDestination, NetworkDestinationContainer> inboundDestinationViewMap = new ConcurrentHashMap<>();
 
     public MBeanBridgeDestination(BrokerService brokerService, NetworkBridgeConfiguration networkBridgeConfiguration, NetworkBridge bridge, NetworkBridgeView networkBridgeView) {
         this.brokerService = brokerService;
@@ -61,49 +59,48 @@ public class MBeanBridgeDestination {
 
     public void onOutboundMessage(Message message) {
         ActiveMQDestination destination = message.getDestination();
-        NetworkDestinationView networkDestinationView = outboundDestinationViewMap.get(destination);
-        if (networkDestinationView == null) {
-            synchronized (destinationObjectNameMap) {
-                if ((networkDestinationView = outboundDestinationViewMap.get(destination)) == null) {
-                    ObjectName bridgeObjectName = bridge.getMbeanObjectName();
-                    try {
-                        ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination);
-                        networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
-                        AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
-                        destinationObjectNameMap.put(destination, objectName);
-                        outboundDestinationViewMap.put(destination, networkDestinationView);
-
-                    } catch (Exception e) {
-                        LOG.warn("Failed to register " + destination, e);
-                    }
-                }
+        NetworkDestinationContainer networkDestinationContainer;
+
+        if ((networkDestinationContainer = outboundDestinationViewMap.get(destination)) == null) {
+            ObjectName bridgeObjectName = bridge.getMbeanObjectName();
+            try {
+                ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination);
+                NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
+                AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
+
+                networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView, objectName);
+                outboundDestinationViewMap.put(destination, networkDestinationContainer);
+                networkDestinationView.messageSent();
+            } catch (Exception e) {
+                LOG.warn("Failed to register " + destination, e);
             }
+        } else {
+            networkDestinationContainer.view.messageSent();
         }
-        networkDestinationView.messageSent();
     }
 
 
     public void onInboundMessage(Message message) {
         ActiveMQDestination destination = message.getDestination();
-        NetworkDestinationView networkDestinationView = inboundDestinationViewMap.get(destination);
-        if (networkDestinationView == null) {
-            synchronized (destinationObjectNameMap) {
-                if ((networkDestinationView = inboundDestinationViewMap.get(destination)) == null) {
-                    ObjectName bridgeObjectName = bridge.getMbeanObjectName();
-                    try {
-                        ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination);
-                        networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
-                        networkBridgeView.addNetworkDestinationView(networkDestinationView);
-                        AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
-                        destinationObjectNameMap.put(destination, objectName);
-                        inboundDestinationViewMap.put(destination, networkDestinationView);
-                    } catch (Exception e) {
-                        LOG.warn("Failed to register " + destination, e);
-                    }
-                }
+        NetworkDestinationContainer networkDestinationContainer;
+
+        if ((networkDestinationContainer = inboundDestinationViewMap.get(destination)) == null) {
+            ObjectName bridgeObjectName = bridge.getMbeanObjectName();
+            try {
+                ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination);
+                NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
+                AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
+
+                networkBridgeView.addNetworkDestinationView(networkDestinationView);
+                networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView, objectName);
+                inboundDestinationViewMap.put(destination, networkDestinationContainer);
+                networkDestinationView.messageSent();
+            } catch (Exception e) {
+                LOG.warn("Failed to register " + destination, e);
             }
+        } else {
+            networkDestinationContainer.view.messageSent();
         }
-        networkDestinationView.messageSent();
     }
 
     public void start() {
@@ -121,18 +118,22 @@ public class MBeanBridgeDestination {
         }
 
         scheduler.cancel(purgeInactiveDestinationViewTask);
-        for (ObjectName objectName : destinationObjectNameMap.values()) {
+        for (NetworkDestinationContainer networkDestinationContainer : inboundDestinationViewMap.values()) {
             try {
-                if (objectName != null) {
-                    brokerService.getManagementContext().unregisterMBean(objectName);
-                }
-            } catch (Throwable e) {
+                brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName);
+            } catch (Exception e) {
+                LOG.error("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
+            }
+        }
+        for (NetworkDestinationContainer networkDestinationContainer : outboundDestinationViewMap.values()) {
+            try {
+                brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName);
+            } catch (Exception e) {
                 LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
             }
         }
-        destinationObjectNameMap.clear();
-        outboundDestinationViewMap.clear();
         inboundDestinationViewMap.clear();
+        outboundDestinationViewMap.clear();
     }
 
     private void purgeInactiveDestinationViews() {
@@ -143,25 +144,32 @@ public class MBeanBridgeDestination {
         purgeInactiveDestinationView(outboundDestinationViewMap);
     }
 
-    private void purgeInactiveDestinationView(Map<ActiveMQDestination, NetworkDestinationView> map) {
+    private void purgeInactiveDestinationView(Map<ActiveMQDestination, NetworkDestinationContainer> map) {
         long time = System.currentTimeMillis() - networkBridgeConfiguration.getGcSweepTime();
-        for (Map.Entry<ActiveMQDestination, NetworkDestinationView> entry : map.entrySet()) {
-            if (entry.getValue().getLastAccessTime() <= time) {
-                synchronized (destinationObjectNameMap) {
-                    map.remove(entry.getKey());
-                    ObjectName objectName = destinationObjectNameMap.remove(entry.getKey());
-                    if (objectName != null) {
-                        try {
-                            if (objectName != null) {
-                                brokerService.getManagementContext().unregisterMBean(objectName);
-                            }
-                        } catch (Throwable e) {
-                            LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
-                        }
+        for (Iterator<Map.Entry<ActiveMQDestination, NetworkDestinationContainer>> it = map.entrySet().iterator(); it.hasNext(); ) {
+            Map.Entry<ActiveMQDestination, NetworkDestinationContainer> entry = it.next();
+            if (entry.getValue().view.getLastAccessTime() <= time) {
+                ObjectName objectName = entry.getValue().objectName;
+                if (objectName != null) {
+                    try {
+                        brokerService.getManagementContext().unregisterMBean(entry.getValue().objectName);
+                    } catch (Throwable e) {
+                        LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
                     }
-                    entry.getValue().close();
                 }
+                entry.getValue().view.close();
+                it.remove();
             }
         }
     }
+
+    private static class NetworkDestinationContainer {
+        private final NetworkDestinationView view;
+        private final ObjectName objectName;
+
+        private NetworkDestinationContainer(NetworkDestinationView view, ObjectName objectName) {
+            this.view = view;
+            this.objectName = objectName;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6b1e8741/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
index bac271f..1efb393 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,17 +16,7 @@
  */
 package org.apache.activemq.network;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assume.assumeNotNull;
-
-import java.net.MalformedURLException;
-import java.util.List;
-import java.util.Set;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectInstance;
-import javax.management.ObjectName;
-
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.ManagementContext;
 import org.apache.activemq.util.TestUtils;
@@ -35,6 +25,20 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import java.net.MalformedURLException;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeNotNull;
+
 public class DuplexNetworkMBeanTest {
 
     protected static final Logger LOG = LoggerFactory.getLogger(DuplexNetworkMBeanTest.class);
@@ -137,6 +141,74 @@ public class DuplexNetworkMBeanTest {
         }
     }
 
+    @Test
+    public void testMBeansNotOverwrittenOnCleanup() throws Exception {
+        BrokerService broker = createBroker();
+
+        BrokerService networkedBroker = createNetworkedBroker();
+        MessageProducer producerBroker = null;
+        MessageConsumer consumerBroker = null;
+        Session sessionNetworkBroker = null;
+        Session sessionBroker = null;
+        MessageProducer producerNetworkBroker = null;
+        MessageConsumer consumerNetworkBroker = null;
+        try {
+            broker.start();
+            broker.waitUntilStarted();
+            networkedBroker.start();
+            try {
+                assertEquals(2, countMbeans(networkedBroker, "connector=networkConnectors", 10000));
+                assertEquals(1, countMbeans(broker, "connector=duplexNetworkConnectors", 10000));
+
+                Connection brokerConnection = new ActiveMQConnectionFactory(broker.getVmConnectorURI()).createConnection();
+                brokerConnection.start();
+
+                sessionBroker = brokerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                producerBroker = sessionBroker.createProducer(sessionBroker.createTopic("testTopic"));
+                consumerBroker = sessionBroker.createConsumer(sessionBroker.createTopic("testTopic"));
+                Connection netWorkBrokerConnection = new ActiveMQConnectionFactory(networkedBroker.getVmConnectorURI()).createConnection();
+                netWorkBrokerConnection.start();
+                sessionNetworkBroker = netWorkBrokerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                producerNetworkBroker = sessionNetworkBroker.createProducer(sessionBroker.createTopic("testTopic"));
+                consumerNetworkBroker = sessionNetworkBroker.createConsumer(sessionBroker.createTopic("testTopic"));
+
+                assertEquals(4, countMbeans(broker, "destinationType=Topic,destinationName=testTopic", 15000));
+                assertEquals(4, countMbeans(networkedBroker, "destinationType=Topic,destinationName=testTopic", 15000));
+
+                producerBroker.send(sessionBroker.createTextMessage("test1"));
+                producerNetworkBroker.send(sessionNetworkBroker.createTextMessage("test2"));
+
+                assertEquals(2, countMbeans(networkedBroker, "destinationName=testTopic,direction=*", 10000));
+                assertEquals(2, countMbeans(broker, "destinationName=testTopic,direction=*", 10000));
+            } finally {
+                if (producerBroker != null) {
+                    producerBroker.close();
+                }
+                if (consumerBroker != null) {
+                    consumerBroker.close();
+                }
+                if (sessionBroker != null) {
+                    sessionBroker.close();
+                }
+                if (sessionNetworkBroker != null) {
+                    sessionNetworkBroker.close();
+                }
+                if (producerNetworkBroker != null) {
+                    producerNetworkBroker.close();
+                }
+                if (consumerNetworkBroker != null) {
+                    consumerNetworkBroker.close();
+                }
+                networkedBroker.stop();
+                networkedBroker.waitUntilStopped();
+            }
+            assertEquals(0, countMbeans(broker, "destinationName=testTopic,direction=*", 1500));
+        } finally {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
     private int countMbeans(BrokerService broker, String type) throws Exception {
         return countMbeans(broker, type, 0);
     }