You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/12/04 20:02:37 UTC
git commit: Fix for https://issues.apache.org/jira/browse/AMQ-4918
Updated Branches:
refs/heads/trunk 02ef9445d -> 489f92968
Fix for https://issues.apache.org/jira/browse/AMQ-4918
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/489f9296
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/489f9296
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/489f9296
Branch: refs/heads/trunk
Commit: 489f929686f29190e7a3aba252767ba38159bdde
Parents: 02ef944
Author: rajdavies <ra...@gmail.com>
Authored: Wed Dec 4 19:00:54 2013 +0000
Committer: rajdavies <ra...@gmail.com>
Committed: Wed Dec 4 19:02:25 2013 +0000
----------------------------------------------------------------------
.../activemq/broker/jmx/BrokerMBeanSupport.java | 15 +++
.../broker/jmx/NetworkDestinationView.java | 73 ++++++++++++
.../broker/jmx/NetworkDestinationViewMBean.java | 48 ++++++++
.../network/DemandForwardingBridgeSupport.java | 16 +++
.../network/MBeanBridgeDestination.java | 111 +++++++++++++++++++
.../activemq/network/MBeanNetworkListener.java | 34 +++++-
.../activemq/network/NetworkBridgeListener.java | 16 +++
.../activemq/management/SizeStatisticImpl.java | 2 +-
8 files changed, 312 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/489f9296/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java
index 3ad4495..87605bc 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java
@@ -173,6 +173,21 @@ public class BrokerMBeanSupport {
return new ObjectName(connectorName.getDomain(), map);
}
+ public static ObjectName createNetworkOutBoundDestinationObjectName(ObjectName networkName, ActiveMQDestination destination) throws MalformedObjectNameException {
+ String str = networkName.toString();
+ str += ",direction=outbound" + createDestinationProperties(destination);
+ return new ObjectName(str);
+
+ }
+
+ public static ObjectName createNetworkInBoundDestinationObjectName(ObjectName networkName, ActiveMQDestination destination) throws MalformedObjectNameException {
+ String str = networkName.toString();
+ str += ",direction=inbound" + createDestinationProperties(destination);
+ return new ObjectName(str);
+
+ }
+
+
public static ObjectName createProxyConnectorName(ObjectName brokerObjectName, String type, String name) throws MalformedObjectNameException {
return createProxyConnectorName(brokerObjectName.toString(), type, name);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/489f9296/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java
new file mode 100644
index 0000000..4edfd37
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.management.TimeStatisticImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetworkDestinationView implements NetworkDestinationViewMBean {
+ private static final Logger LOG = LoggerFactory.getLogger(NetworkDestinationViewMBean.class);
+ private TimeStatisticImpl timeStatistic = new TimeStatisticImpl("networkEnqueue","network messages enqueued");
+
+ private final String name;
+ private long lastTime = -1;
+
+ public NetworkDestinationView(String name){
+ this.name = name;
+ }
+ /**
+ * Returns the name of this destination
+ */
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Resets the managment counters.
+ */
+ @Override
+ public void resetStatistics() {
+ timeStatistic.reset();
+ lastTime = -1;
+ }
+
+
+ @Override
+ public long getCount() {
+ return timeStatistic.getCount();
+ }
+
+ @Override
+ public double getRate() {
+ return timeStatistic.getAveragePerSecond();
+ }
+
+ public void messageSent(){
+ long currentTime = System.currentTimeMillis();
+ long time = 0;
+ if (lastTime < 0){
+ time = 0;
+ lastTime = currentTime;
+ }else{
+ time = currentTime-lastTime;
+ }
+ timeStatistic.addTime(time);
+ lastTime=currentTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/489f9296/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationViewMBean.java
new file mode 100644
index 0000000..7164cdd
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationViewMBean.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+public interface NetworkDestinationViewMBean {
+
+ /**
+ * Returns the name of this destination
+ */
+ @MBeanInfo("Name of this destination.")
+ String getName();
+
+ /**
+ * Resets the managment counters.
+ */
+ @MBeanInfo("Resets statistics.")
+ void resetStatistics();
+
+ /**
+ * Returns the number of messages that have been sent to the destination.
+ *
+ * @return The number of messages that have been sent to the destination.
+ */
+ @MBeanInfo("Number of messages that have been sent to the destination.")
+ long getCount();
+
+ /**
+ * Returns the rate of messages that have been sent to the destination.
+ *
+ * @return The rate of messages that have been sent to the destination.
+ */
+ @MBeanInfo("rate of messages sent across the network destination.")
+ double getRate();
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/489f9296/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index af10a94..bf0b4f6 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -607,6 +607,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} else {
duplexInboundLocalBroker.oneway(message);
}
+ serviceInboundMessage(message);
}
} else {
switch (command.getDataStructureType()) {
@@ -985,6 +986,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
sub.decrementOutstandingResponses();
}
}
+ serviceOutbound(message);
} else {
LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage());
}
@@ -1612,4 +1614,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
+ protected void serviceOutbound(Message message){
+ NetworkBridgeListener l = this.networkBridgeListener;
+ if (l != null){
+ l.onOutboundMessage(this,message);
+ }
+ }
+
+ protected void serviceInboundMessage(Message message){
+ NetworkBridgeListener l = this.networkBridgeListener;
+ if (l != null){
+ l.onInboundMessage(this,message);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/489f9296/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
new file mode 100644
index 0000000..666f11e
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.management.ObjectName;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.AnnotatedMBean;
+import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
+import org.apache.activemq.broker.jmx.NetworkDestinationView;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MBeanBridgeDestination {
+ private static final Logger LOG = LoggerFactory.getLogger(MBeanBridgeDestination.class);
+ private final BrokerService brokerService;
+ private final NetworkBridge bridge;
+ private Map<ActiveMQDestination, ObjectName> destinationObjectNameMap = new ConcurrentHashMap<ActiveMQDestination, ObjectName>();
+ private Map<ActiveMQDestination, NetworkDestinationView> outboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
+ private Map<ActiveMQDestination, NetworkDestinationView> inboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
+
+ public MBeanBridgeDestination(BrokerService brokerService, NetworkBridge bridge) {
+ this.brokerService = brokerService;
+ this.bridge = bridge;
+ }
+
+
+ public void onOutboundMessage(Message message) {
+ ActiveMQDestination destination = message.getDestination();
+ NetworkDestinationView networkDestinationView = outboundDestinationViewMap.get(destination);
+ if (networkDestinationView == null) {
+ synchronized (destinationObjectNameMap) {
+ if (!destinationObjectNameMap.containsKey(destination)) {
+ ObjectName bridgeObjectName = bridge.getMbeanObjectName();
+ try {
+ ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination);
+ networkDestinationView = new NetworkDestinationView(destination.getPhysicalName());
+ AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
+ destinationObjectNameMap.put(destination, objectName);
+ outboundDestinationViewMap.put(destination, networkDestinationView);
+
+ } catch (Exception e) {
+ LOG.warn("Failed to register " + destination, e);
+ }
+ }
+ }
+ }
+ networkDestinationView.messageSent();
+ }
+
+
+ public void onInboundMessage(Message message) {
+ ActiveMQDestination destination = message.getDestination();
+ NetworkDestinationView networkDestinationView = inboundDestinationViewMap.get(destination);
+ if (networkDestinationView == null) {
+ synchronized (destinationObjectNameMap) {
+ if (!destinationObjectNameMap.containsKey(destination)) {
+ ObjectName bridgeObjectName = bridge.getMbeanObjectName();
+ try {
+ ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination);
+ networkDestinationView= new NetworkDestinationView(destination.getPhysicalName());
+ AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
+ destinationObjectNameMap.put(destination, objectName);
+ inboundDestinationViewMap.put(destination, networkDestinationView);
+ } catch (Exception e) {
+ LOG.warn("Failed to register " + destination, e);
+ }
+ }
+ }
+ }
+ networkDestinationView.messageSent();
+ }
+
+ public void close() {
+ if (!brokerService.isUseJmx()) {
+ return;
+ }
+
+ for (ObjectName objectName : destinationObjectNameMap.values()) {
+ try {
+ if (objectName != null) {
+ brokerService.getManagementContext().unregisterMBean(objectName);
+ }
+ } catch (Throwable e) {
+ LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
+ }
+ }
+ destinationObjectNameMap.clear();
+ outboundDestinationViewMap.clear();
+ inboundDestinationViewMap.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/489f9296/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java
index c9a9935..0481f3d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java
@@ -16,14 +16,17 @@
*/
package org.apache.activemq.network;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
-
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
import org.apache.activemq.broker.jmx.NetworkBridgeView;
import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
+import org.apache.activemq.command.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +37,7 @@ public class MBeanNetworkListener implements NetworkBridgeListener {
BrokerService brokerService;
ObjectName connectorName;
boolean createdByDuplex = false;
-
+ private Map<NetworkBridge,MBeanBridgeDestination> destinationObjectNameMap = new ConcurrentHashMap<NetworkBridge,MBeanBridgeDestination>();
public MBeanNetworkListener(BrokerService brokerService, ObjectName connectorName) {
this.brokerService = brokerService;
this.connectorName = connectorName;
@@ -55,6 +58,8 @@ public class MBeanNetworkListener implements NetworkBridgeListener {
ObjectName objectName = createNetworkBridgeObjectName(bridge);
AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view, objectName);
bridge.setMbeanObjectName(objectName);
+ MBeanBridgeDestination mBeanBridgeDestination = new MBeanBridgeDestination(brokerService,bridge);
+ destinationObjectNameMap.put(bridge,mBeanBridgeDestination);
LOG.debug("registered: {} as: {}", bridge, objectName);
} catch (Throwable e) {
LOG.debug("Network bridge could not be registered in JMX: {}", e.getMessage(), e);
@@ -71,11 +76,17 @@ public class MBeanNetworkListener implements NetworkBridgeListener {
if (objectName != null) {
brokerService.getManagementContext().unregisterMBean(objectName);
}
+ MBeanBridgeDestination mBeanBridgeDestination = destinationObjectNameMap.remove(bridge);
+ if (mBeanBridgeDestination != null){
+ mBeanBridgeDestination.close();
+ }
} catch (Throwable e) {
LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
}
}
+
+
protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException {
return BrokerMBeanSupport.createNetworkBridgeObjectName(connectorName, bridge.getRemoteAddress());
}
@@ -83,4 +94,23 @@ public class MBeanNetworkListener implements NetworkBridgeListener {
public void setCreatedByDuplex(boolean createdByDuplex) {
this.createdByDuplex = createdByDuplex;
}
+
+
+
+ @Override
+ public void onOutboundMessage(NetworkBridge bridge,Message message) {
+ MBeanBridgeDestination mBeanBridgeDestination = destinationObjectNameMap.get(bridge);
+ if (mBeanBridgeDestination != null){
+ mBeanBridgeDestination.onOutboundMessage(message);
+ }
+ }
+
+ @Override
+ public void onInboundMessage(NetworkBridge bridge,Message message) {
+ MBeanBridgeDestination mBeanBridgeDestination = destinationObjectNameMap.get(bridge);
+ if (mBeanBridgeDestination != null){
+ mBeanBridgeDestination.onInboundMessage(message);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/489f9296/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java
index 7d49177..c30a999 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.network;
+import org.apache.activemq.command.Message;
+
/**
* called when a bridge fails
*
@@ -38,4 +40,18 @@ public interface NetworkBridgeListener {
*/
void onStop(NetworkBridge bridge);
+ /**
+ * Called when message forwarded over the network
+ * @param bridge
+ * @param message
+ */
+ void onOutboundMessage (NetworkBridge bridge,Message message);
+
+ /**
+ * Called for when a message arrives over the network
+ * @param bridge
+ * @param message
+ */
+ void onInboundMessage (NetworkBridge bridge,Message message);
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/489f9296/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java b/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
index 14664d2..d3dbb50 100644
--- a/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
+++ b/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
@@ -154,7 +154,7 @@ public class SizeStatisticImpl extends StatisticImpl{
buffer.append(Long.toString(minSize));
buffer.append(" totalSize: ");
buffer.append(Long.toString(totalSize));
- buffer.append(" averageTime: ");
+ buffer.append(" averageSize: ");
buffer.append(Double.toString(getAverageSize()));
buffer.append(" averageTimeExMinMax: ");
buffer.append(Double.toString(getAveragePerSecondExcludingMinMax()));