You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/01/11 14:00:22 UTC
[1/2] activemq git commit: Network of brokers on duplex mode reports
InstanceAlreadyExistsException on already existing destinations
https://issues.apache.org/jira/browse/AMQ-6052 closes #164
Repository: activemq
Updated Branches:
refs/heads/master 8f6baf818 -> 3bb7e4a5c
Network of brokers on duplex mode reports InstanceAlreadyExistsException on already existing destinations
https://issues.apache.org/jira/browse/AMQ-6052
closes #164
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3bb7e4a5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3bb7e4a5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3bb7e4a5
Branch: refs/heads/master
Commit: 3bb7e4a5c8497c8259a267e2eb11dddb8615a1ea
Parents: 6b1e874
Author: Altaflux <pa...@gmail.com>
Authored: Mon Jan 4 15:03:26 2016 -0600
Committer: gtully <ga...@gmail.com>
Committed: Mon Jan 11 12:59:33 2016 +0000
----------------------------------------------------------------------
.../org/apache/activemq/network/MBeanBridgeDestination.java | 6 +++---
.../org/apache/activemq/network/DuplexNetworkMBeanTest.java | 6 +++---
2 files changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/3bb7e4a5/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
index 888d295..479e382 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
http://git-wip-us.apache.org/repos/asf/activemq/blob/3bb7e4a5/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
index 1efb393..3736f7d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
[2/2] activemq git commit: Network of brokers on duplex mode reports
InstanceAlreadyExistsException on already existing destinations
Posted by gt...@apache.org.
Network of brokers on duplex mode reports InstanceAlreadyExistsException on already existing destinations
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6b1e8741
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6b1e8741
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6b1e8741
Branch: refs/heads/master
Commit: 6b1e87410da4a2033c286fcaa758371e48da62ec
Parents: 8f6baf8
Author: Altaflux <pa...@gmail.com>
Authored: Mon Jan 4 15:00:00 2016 -0600
Committer: gtully <ga...@gmail.com>
Committed: Mon Jan 11 12:59:33 2016 +0000
----------------------------------------------------------------------
.../network/MBeanBridgeDestination.java | 144 ++++++++++---------
.../network/DuplexNetworkMBeanTest.java | 100 +++++++++++--
2 files changed, 162 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/6b1e8741/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
index bab5574..888d295 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,12 +16,6 @@
*/
package org.apache.activemq.network;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.management.ObjectName;
-
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
@@ -33,6 +27,11 @@ import org.apache.activemq.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.management.ObjectName;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
public class MBeanBridgeDestination {
private static final Logger LOG = LoggerFactory.getLogger(MBeanBridgeDestination.class);
private final BrokerService brokerService;
@@ -41,9 +40,8 @@ public class MBeanBridgeDestination {
private final NetworkBridgeConfiguration networkBridgeConfiguration;
private final Scheduler scheduler;
private final Runnable purgeInactiveDestinationViewTask;
- private Map<ActiveMQDestination, ObjectName> destinationObjectNameMap = new ConcurrentHashMap<ActiveMQDestination, ObjectName>();
- private Map<ActiveMQDestination, NetworkDestinationView> outboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
- private Map<ActiveMQDestination, NetworkDestinationView> inboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
+ private final Map<ActiveMQDestination, NetworkDestinationContainer> outboundDestinationViewMap = new ConcurrentHashMap<>();
+ private final Map<ActiveMQDestination, NetworkDestinationContainer> inboundDestinationViewMap = new ConcurrentHashMap<>();
public MBeanBridgeDestination(BrokerService brokerService, NetworkBridgeConfiguration networkBridgeConfiguration, NetworkBridge bridge, NetworkBridgeView networkBridgeView) {
this.brokerService = brokerService;
@@ -61,49 +59,48 @@ public class MBeanBridgeDestination {
public void onOutboundMessage(Message message) {
ActiveMQDestination destination = message.getDestination();
- NetworkDestinationView networkDestinationView = outboundDestinationViewMap.get(destination);
- if (networkDestinationView == null) {
- synchronized (destinationObjectNameMap) {
- if ((networkDestinationView = outboundDestinationViewMap.get(destination)) == null) {
- ObjectName bridgeObjectName = bridge.getMbeanObjectName();
- try {
- ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination);
- networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
- AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
- destinationObjectNameMap.put(destination, objectName);
- outboundDestinationViewMap.put(destination, networkDestinationView);
-
- } catch (Exception e) {
- LOG.warn("Failed to register " + destination, e);
- }
- }
+ NetworkDestinationContainer networkDestinationContainer;
+
+ if ((networkDestinationContainer = outboundDestinationViewMap.get(destination)) == null) {
+ ObjectName bridgeObjectName = bridge.getMbeanObjectName();
+ try {
+ ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination);
+ NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
+ AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
+
+ networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView, objectName);
+ outboundDestinationViewMap.put(destination, networkDestinationContainer);
+ networkDestinationView.messageSent();
+ } catch (Exception e) {
+ LOG.warn("Failed to register " + destination, e);
}
+ } else {
+ networkDestinationContainer.view.messageSent();
}
- networkDestinationView.messageSent();
}
public void onInboundMessage(Message message) {
ActiveMQDestination destination = message.getDestination();
- NetworkDestinationView networkDestinationView = inboundDestinationViewMap.get(destination);
- if (networkDestinationView == null) {
- synchronized (destinationObjectNameMap) {
- if ((networkDestinationView = inboundDestinationViewMap.get(destination)) == null) {
- ObjectName bridgeObjectName = bridge.getMbeanObjectName();
- try {
- ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination);
- networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
- networkBridgeView.addNetworkDestinationView(networkDestinationView);
- AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
- destinationObjectNameMap.put(destination, objectName);
- inboundDestinationViewMap.put(destination, networkDestinationView);
- } catch (Exception e) {
- LOG.warn("Failed to register " + destination, e);
- }
- }
+ NetworkDestinationContainer networkDestinationContainer;
+
+ if ((networkDestinationContainer = inboundDestinationViewMap.get(destination)) == null) {
+ ObjectName bridgeObjectName = bridge.getMbeanObjectName();
+ try {
+ ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination);
+ NetworkDestinationView networkDestinationView = new NetworkDestinationView(networkBridgeView, destination.getPhysicalName());
+ AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
+
+ networkBridgeView.addNetworkDestinationView(networkDestinationView);
+ networkDestinationContainer = new NetworkDestinationContainer(networkDestinationView, objectName);
+ inboundDestinationViewMap.put(destination, networkDestinationContainer);
+ networkDestinationView.messageSent();
+ } catch (Exception e) {
+ LOG.warn("Failed to register " + destination, e);
}
+ } else {
+ networkDestinationContainer.view.messageSent();
}
- networkDestinationView.messageSent();
}
public void start() {
@@ -121,18 +118,22 @@ public class MBeanBridgeDestination {
}
scheduler.cancel(purgeInactiveDestinationViewTask);
- for (ObjectName objectName : destinationObjectNameMap.values()) {
+ for (NetworkDestinationContainer networkDestinationContainer : inboundDestinationViewMap.values()) {
try {
- if (objectName != null) {
- brokerService.getManagementContext().unregisterMBean(objectName);
- }
- } catch (Throwable e) {
+ brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName);
+ } catch (Exception e) {
+ LOG.error("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
+ }
+ }
+ for (NetworkDestinationContainer networkDestinationContainer : outboundDestinationViewMap.values()) {
+ try {
+ brokerService.getManagementContext().unregisterMBean(networkDestinationContainer.objectName);
+ } catch (Exception e) {
LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
}
}
- destinationObjectNameMap.clear();
- outboundDestinationViewMap.clear();
inboundDestinationViewMap.clear();
+ outboundDestinationViewMap.clear();
}
private void purgeInactiveDestinationViews() {
@@ -143,25 +144,32 @@ public class MBeanBridgeDestination {
purgeInactiveDestinationView(outboundDestinationViewMap);
}
- private void purgeInactiveDestinationView(Map<ActiveMQDestination, NetworkDestinationView> map) {
+ private void purgeInactiveDestinationView(Map<ActiveMQDestination, NetworkDestinationContainer> map) {
long time = System.currentTimeMillis() - networkBridgeConfiguration.getGcSweepTime();
- for (Map.Entry<ActiveMQDestination, NetworkDestinationView> entry : map.entrySet()) {
- if (entry.getValue().getLastAccessTime() <= time) {
- synchronized (destinationObjectNameMap) {
- map.remove(entry.getKey());
- ObjectName objectName = destinationObjectNameMap.remove(entry.getKey());
- if (objectName != null) {
- try {
- if (objectName != null) {
- brokerService.getManagementContext().unregisterMBean(objectName);
- }
- } catch (Throwable e) {
- LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
- }
+ for (Iterator<Map.Entry<ActiveMQDestination, NetworkDestinationContainer>> it = map.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<ActiveMQDestination, NetworkDestinationContainer> entry = it.next();
+ if (entry.getValue().view.getLastAccessTime() <= time) {
+ ObjectName objectName = entry.getValue().objectName;
+ if (objectName != null) {
+ try {
+ brokerService.getManagementContext().unregisterMBean(entry.getValue().objectName);
+ } catch (Throwable e) {
+ LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
}
- entry.getValue().close();
}
+ entry.getValue().view.close();
+ it.remove();
}
}
}
+
+ private static class NetworkDestinationContainer {
+ private final NetworkDestinationView view;
+ private final ObjectName objectName;
+
+ private NetworkDestinationContainer(NetworkDestinationView view, ObjectName objectName) {
+ this.view = view;
+ this.objectName = objectName;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/6b1e8741/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
index bac271f..1efb393 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,17 +16,7 @@
*/
package org.apache.activemq.network;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assume.assumeNotNull;
-
-import java.net.MalformedURLException;
-import java.util.List;
-import java.util.Set;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectInstance;
-import javax.management.ObjectName;
-
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.util.TestUtils;
@@ -35,6 +25,20 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import java.net.MalformedURLException;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeNotNull;
+
public class DuplexNetworkMBeanTest {
protected static final Logger LOG = LoggerFactory.getLogger(DuplexNetworkMBeanTest.class);
@@ -137,6 +141,74 @@ public class DuplexNetworkMBeanTest {
}
}
+ @Test
+ public void testMBeansNotOverwrittenOnCleanup() throws Exception {
+ BrokerService broker = createBroker();
+
+ BrokerService networkedBroker = createNetworkedBroker();
+ MessageProducer producerBroker = null;
+ MessageConsumer consumerBroker = null;
+ Session sessionNetworkBroker = null;
+ Session sessionBroker = null;
+ MessageProducer producerNetworkBroker = null;
+ MessageConsumer consumerNetworkBroker = null;
+ try {
+ broker.start();
+ broker.waitUntilStarted();
+ networkedBroker.start();
+ try {
+ assertEquals(2, countMbeans(networkedBroker, "connector=networkConnectors", 10000));
+ assertEquals(1, countMbeans(broker, "connector=duplexNetworkConnectors", 10000));
+
+ Connection brokerConnection = new ActiveMQConnectionFactory(broker.getVmConnectorURI()).createConnection();
+ brokerConnection.start();
+
+ sessionBroker = brokerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producerBroker = sessionBroker.createProducer(sessionBroker.createTopic("testTopic"));
+ consumerBroker = sessionBroker.createConsumer(sessionBroker.createTopic("testTopic"));
+ Connection netWorkBrokerConnection = new ActiveMQConnectionFactory(networkedBroker.getVmConnectorURI()).createConnection();
+ netWorkBrokerConnection.start();
+ sessionNetworkBroker = netWorkBrokerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ producerNetworkBroker = sessionNetworkBroker.createProducer(sessionBroker.createTopic("testTopic"));
+ consumerNetworkBroker = sessionNetworkBroker.createConsumer(sessionBroker.createTopic("testTopic"));
+
+ assertEquals(4, countMbeans(broker, "destinationType=Topic,destinationName=testTopic", 15000));
+ assertEquals(4, countMbeans(networkedBroker, "destinationType=Topic,destinationName=testTopic", 15000));
+
+ producerBroker.send(sessionBroker.createTextMessage("test1"));
+ producerNetworkBroker.send(sessionNetworkBroker.createTextMessage("test2"));
+
+ assertEquals(2, countMbeans(networkedBroker, "destinationName=testTopic,direction=*", 10000));
+ assertEquals(2, countMbeans(broker, "destinationName=testTopic,direction=*", 10000));
+ } finally {
+ if (producerBroker != null) {
+ producerBroker.close();
+ }
+ if (consumerBroker != null) {
+ consumerBroker.close();
+ }
+ if (sessionBroker != null) {
+ sessionBroker.close();
+ }
+ if (sessionNetworkBroker != null) {
+ sessionNetworkBroker.close();
+ }
+ if (producerNetworkBroker != null) {
+ producerNetworkBroker.close();
+ }
+ if (consumerNetworkBroker != null) {
+ consumerNetworkBroker.close();
+ }
+ networkedBroker.stop();
+ networkedBroker.waitUntilStopped();
+ }
+ assertEquals(0, countMbeans(broker, "destinationName=testTopic,direction=*", 1500));
+ } finally {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ }
+
private int countMbeans(BrokerService broker, String type) throws Exception {
return countMbeans(broker, type, 0);
}