You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2013/12/04 20:02:37 UTC

git commit: Fix for https://issues.apache.org/jira/browse/AMQ-4918

Updated Branches:
  refs/heads/trunk 02ef9445d -> 489f92968


Fix for https://issues.apache.org/jira/browse/AMQ-4918


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/489f9296
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/489f9296
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/489f9296

Branch: refs/heads/trunk
Commit: 489f929686f29190e7a3aba252767ba38159bdde
Parents: 02ef944
Author: rajdavies <ra...@gmail.com>
Authored: Wed Dec 4 19:00:54 2013 +0000
Committer: rajdavies <ra...@gmail.com>
Committed: Wed Dec 4 19:02:25 2013 +0000

----------------------------------------------------------------------
 .../activemq/broker/jmx/BrokerMBeanSupport.java |  15 +++
 .../broker/jmx/NetworkDestinationView.java      |  73 ++++++++++++
 .../broker/jmx/NetworkDestinationViewMBean.java |  48 ++++++++
 .../network/DemandForwardingBridgeSupport.java  |  16 +++
 .../network/MBeanBridgeDestination.java         | 111 +++++++++++++++++++
 .../activemq/network/MBeanNetworkListener.java  |  34 +++++-
 .../activemq/network/NetworkBridgeListener.java |  16 +++
 .../activemq/management/SizeStatisticImpl.java  |   2 +-
 8 files changed, 312 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/489f9296/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java
index 3ad4495..87605bc 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerMBeanSupport.java
@@ -173,6 +173,21 @@ public class BrokerMBeanSupport {
         return new ObjectName(connectorName.getDomain(), map);
     }
 
+    public static ObjectName createNetworkOutBoundDestinationObjectName(ObjectName networkName, ActiveMQDestination destination) throws MalformedObjectNameException {
+        String str = networkName.toString();
+        str += ",direction=outbound" + createDestinationProperties(destination);
+        return new ObjectName(str);
+
+    }
+
+    public static ObjectName createNetworkInBoundDestinationObjectName(ObjectName networkName, ActiveMQDestination destination) throws MalformedObjectNameException {
+        String str = networkName.toString();
+        str += ",direction=inbound" + createDestinationProperties(destination);
+        return new ObjectName(str);
+
+    }
+
+
     public static ObjectName createProxyConnectorName(ObjectName brokerObjectName, String type, String name) throws MalformedObjectNameException {
         return createProxyConnectorName(brokerObjectName.toString(), type, name);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/489f9296/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java
new file mode 100644
index 0000000..4edfd37
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationView.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+import org.apache.activemq.management.TimeStatisticImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetworkDestinationView implements NetworkDestinationViewMBean {
+    private static final Logger LOG = LoggerFactory.getLogger(NetworkDestinationViewMBean.class);
+    private TimeStatisticImpl timeStatistic = new TimeStatisticImpl("networkEnqueue","network messages enqueued");
+
+    private final String name;
+    private long lastTime = -1;
+
+    public NetworkDestinationView(String name){
+       this.name = name;
+    }
+    /**
+     * Returns the name of this destination
+     */
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * Resets the managment counters.
+     */
+    @Override
+    public void resetStatistics() {
+        timeStatistic.reset();
+        lastTime = -1;
+    }
+
+
+    @Override
+    public long getCount() {
+        return timeStatistic.getCount();
+    }
+
+    @Override
+    public double getRate() {
+        return timeStatistic.getAveragePerSecond();
+    }
+
+    public void messageSent(){
+        long currentTime = System.currentTimeMillis();
+        long time = 0;
+        if (lastTime < 0){
+            time = 0;
+            lastTime = currentTime;
+        }else{
+            time = currentTime-lastTime;
+        }
+        timeStatistic.addTime(time);
+        lastTime=currentTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/489f9296/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationViewMBean.java
new file mode 100644
index 0000000..7164cdd
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/NetworkDestinationViewMBean.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.jmx;
+
+public interface NetworkDestinationViewMBean {
+
+    /**
+     * Returns the name of this destination
+     */
+    @MBeanInfo("Name of this destination.")
+    String getName();
+
+    /**
+     * Resets the managment counters.
+     */
+    @MBeanInfo("Resets statistics.")
+    void resetStatistics();
+
+    /**
+     * Returns the number of messages that have been sent to the destination.
+     *
+     * @return The number of messages that have been sent to the destination.
+     */
+    @MBeanInfo("Number of messages that have been sent to the destination.")
+    long getCount();
+
+    /**
+     * Returns the rate of messages that have been sent to the destination.
+     *
+     * @return The rate of messages that have been sent to the destination.
+     */
+    @MBeanInfo("rate of messages sent across the network destination.")
+    double getRate();
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/489f9296/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index af10a94..bf0b4f6 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -607,6 +607,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                                 } else {
                                     duplexInboundLocalBroker.oneway(message);
                                 }
+                                serviceInboundMessage(message);
                             }
                         } else {
                             switch (command.getDataStructureType()) {
@@ -985,6 +986,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
                                 sub.decrementOutstandingResponses();
                             }
                         }
+                        serviceOutbound(message);
                     } else {
                         LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage());
                     }
@@ -1612,4 +1614,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
         }
     }
 
+    protected void serviceOutbound(Message message){
+        NetworkBridgeListener l = this.networkBridgeListener;
+        if (l != null){
+            l.onOutboundMessage(this,message);
+        }
+    }
+
+    protected void serviceInboundMessage(Message message){
+        NetworkBridgeListener l = this.networkBridgeListener;
+        if (l != null){
+            l.onInboundMessage(this,message);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/489f9296/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
new file mode 100644
index 0000000..666f11e
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanBridgeDestination.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.management.ObjectName;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.AnnotatedMBean;
+import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
+import org.apache.activemq.broker.jmx.NetworkDestinationView;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MBeanBridgeDestination {
+    private static final Logger LOG = LoggerFactory.getLogger(MBeanBridgeDestination.class);
+    private final BrokerService brokerService;
+    private final NetworkBridge bridge;
+    private Map<ActiveMQDestination, ObjectName> destinationObjectNameMap = new ConcurrentHashMap<ActiveMQDestination, ObjectName>();
+    private Map<ActiveMQDestination, NetworkDestinationView> outboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
+    private Map<ActiveMQDestination, NetworkDestinationView> inboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
+
+    public MBeanBridgeDestination(BrokerService brokerService, NetworkBridge bridge) {
+        this.brokerService = brokerService;
+        this.bridge = bridge;
+    }
+
+
+    public void onOutboundMessage(Message message) {
+        ActiveMQDestination destination = message.getDestination();
+        NetworkDestinationView networkDestinationView = outboundDestinationViewMap.get(destination);
+        if (networkDestinationView == null) {
+            synchronized (destinationObjectNameMap) {
+                if (!destinationObjectNameMap.containsKey(destination)) {
+                    ObjectName bridgeObjectName = bridge.getMbeanObjectName();
+                    try {
+                        ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination);
+                        networkDestinationView = new NetworkDestinationView(destination.getPhysicalName());
+                        AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
+                        destinationObjectNameMap.put(destination, objectName);
+                        outboundDestinationViewMap.put(destination, networkDestinationView);
+
+                    } catch (Exception e) {
+                        LOG.warn("Failed to register " + destination, e);
+                    }
+                }
+            }
+        }
+        networkDestinationView.messageSent();
+    }
+
+
+    public void onInboundMessage(Message message) {
+        ActiveMQDestination destination = message.getDestination();
+        NetworkDestinationView networkDestinationView = inboundDestinationViewMap.get(destination);
+        if (networkDestinationView == null) {
+            synchronized (destinationObjectNameMap) {
+                if (!destinationObjectNameMap.containsKey(destination)) {
+                    ObjectName bridgeObjectName = bridge.getMbeanObjectName();
+                    try {
+                        ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination);
+                        networkDestinationView= new NetworkDestinationView(destination.getPhysicalName());
+                        AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
+                        destinationObjectNameMap.put(destination, objectName);
+                        inboundDestinationViewMap.put(destination, networkDestinationView);
+                    } catch (Exception e) {
+                        LOG.warn("Failed to register " + destination, e);
+                    }
+                }
+            }
+        }
+        networkDestinationView.messageSent();
+    }
+
+    public void close() {
+        if (!brokerService.isUseJmx()) {
+            return;
+        }
+
+        for (ObjectName objectName : destinationObjectNameMap.values()) {
+            try {
+                if (objectName != null) {
+                    brokerService.getManagementContext().unregisterMBean(objectName);
+                }
+            } catch (Throwable e) {
+                LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
+            }
+        }
+        destinationObjectNameMap.clear();
+        outboundDestinationViewMap.clear();
+        inboundDestinationViewMap.clear();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/489f9296/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java
index c9a9935..0481f3d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/MBeanNetworkListener.java
@@ -16,14 +16,17 @@
  */
 package org.apache.activemq.network;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
-
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.AnnotatedMBean;
 import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
 import org.apache.activemq.broker.jmx.NetworkBridgeView;
 import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
+import org.apache.activemq.command.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,7 +37,7 @@ public class MBeanNetworkListener implements NetworkBridgeListener {
     BrokerService brokerService;
     ObjectName connectorName;
     boolean createdByDuplex = false;
-
+    private Map<NetworkBridge,MBeanBridgeDestination> destinationObjectNameMap = new ConcurrentHashMap<NetworkBridge,MBeanBridgeDestination>();
     public MBeanNetworkListener(BrokerService brokerService, ObjectName connectorName) {
         this.brokerService = brokerService;
         this.connectorName = connectorName;
@@ -55,6 +58,8 @@ public class MBeanNetworkListener implements NetworkBridgeListener {
             ObjectName objectName = createNetworkBridgeObjectName(bridge);
             AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view, objectName);
             bridge.setMbeanObjectName(objectName);
+            MBeanBridgeDestination mBeanBridgeDestination = new MBeanBridgeDestination(brokerService,bridge);
+            destinationObjectNameMap.put(bridge,mBeanBridgeDestination);
             LOG.debug("registered: {} as: {}", bridge, objectName);
         } catch (Throwable e) {
             LOG.debug("Network bridge could not be registered in JMX: {}", e.getMessage(), e);
@@ -71,11 +76,17 @@ public class MBeanNetworkListener implements NetworkBridgeListener {
             if (objectName != null) {
                 brokerService.getManagementContext().unregisterMBean(objectName);
             }
+            MBeanBridgeDestination mBeanBridgeDestination = destinationObjectNameMap.remove(bridge);
+            if (mBeanBridgeDestination != null){
+                mBeanBridgeDestination.close();
+            }
         } catch (Throwable e) {
             LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
         }
     }
 
+
+
     protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException {
         return BrokerMBeanSupport.createNetworkBridgeObjectName(connectorName, bridge.getRemoteAddress());
     }
@@ -83,4 +94,23 @@ public class MBeanNetworkListener implements NetworkBridgeListener {
     public void setCreatedByDuplex(boolean createdByDuplex) {
         this.createdByDuplex = createdByDuplex;
     }
+
+
+
+    @Override
+    public void onOutboundMessage(NetworkBridge bridge,Message message) {
+        MBeanBridgeDestination mBeanBridgeDestination = destinationObjectNameMap.get(bridge);
+        if (mBeanBridgeDestination != null){
+            mBeanBridgeDestination.onOutboundMessage(message);
+        }
+    }
+
+    @Override
+    public void onInboundMessage(NetworkBridge bridge,Message message) {
+        MBeanBridgeDestination mBeanBridgeDestination = destinationObjectNameMap.get(bridge);
+        if (mBeanBridgeDestination != null){
+            mBeanBridgeDestination.onInboundMessage(message);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/489f9296/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java
index 7d49177..c30a999 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeListener.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.network;
 
+import org.apache.activemq.command.Message;
+
 /**
  * called when a bridge fails
  * 
@@ -38,4 +40,18 @@ public interface NetworkBridgeListener {
      */
     void onStop(NetworkBridge bridge);
 
+    /**
+     * Called when message forwarded over the network
+     * @param bridge
+     * @param message
+     */
+    void onOutboundMessage (NetworkBridge bridge,Message message);
+
+    /**
+     * Called for when a message arrives over the network
+     * @param bridge
+     * @param message
+     */
+    void onInboundMessage (NetworkBridge bridge,Message message);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/489f9296/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java b/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
index 14664d2..d3dbb50 100644
--- a/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
+++ b/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
@@ -154,7 +154,7 @@ public class SizeStatisticImpl extends StatisticImpl{
         buffer.append(Long.toString(minSize));
         buffer.append(" totalSize: ");
         buffer.append(Long.toString(totalSize));
-        buffer.append(" averageTime: ");
+        buffer.append(" averageSize: ");
         buffer.append(Double.toString(getAverageSize()));
         buffer.append(" averageTimeExMinMax: ");
         buffer.append(Double.toString(getAveragePerSecondExcludingMinMax()));