You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by hi...@apache.org on 2010/07/19 11:22:03 UTC

svn commit: r965396 - in /synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp: ./ util/

Author: hiranya
Date: Mon Jul 19 09:21:58 2010
New Revision: 965396

URL: http://svn.apache.org/viewvc?rev=965396&view=rev
Log:
Adding the connections view JMX monitor to the NHTTP transport (SYNAPSE-669). This view provides stats on newly opened connections and message sizes mediated by the transport.


Added:
    synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/ConnectionsView.java
    synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/ConnectionsViewMBean.java
    synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/NhttpMetricsCollector.java
Modified:
    synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
    synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
    synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
    synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java

Modified: synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java?rev=965396&r1=965395&r2=965396&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java (original)
+++ synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java Mon Jul 19 09:21:58 2010
@@ -57,6 +57,7 @@ import org.apache.http.params.DefaultedH
 import org.apache.http.params.HttpParams;
 import org.apache.http.protocol.*;
 import org.apache.synapse.transport.nhttp.debug.ClientConnectionDebug;
+import org.apache.synapse.transport.nhttp.util.NhttpMetricsCollector;
 import org.apache.synapse.commons.jmx.ThreadingView;
 
 import java.io.IOException;
@@ -91,7 +92,7 @@ public class ClientHandler implements NH
 
     private WorkerPool workerPool = null;
     /** the metrics collector */
-    private MetricsCollector metrics = null;
+    private NhttpMetricsCollector metrics = null;
 
     /** Array of content types for which warnings are logged if HTTP status code is 500. */
     private String[] warnOnHttp500;
@@ -125,7 +126,7 @@ public class ClientHandler implements NH
      * @param metrics statistics collection metrics
      */
     public ClientHandler(final ConfigurationContext cfgCtx, final HttpParams params,
-        final MetricsCollector metrics) {
+        final NhttpMetricsCollector metrics) {
         
         super();
         this.cfgCtx = cfgCtx;
@@ -182,6 +183,9 @@ public class ClientHandler implements NH
         if (log.isDebugEnabled() ) {
             log.debug("ClientHandler connected : " + conn);
         }
+
+        metrics.connected();
+
         // record connection creation time for debug logging
         conn.getContext().setAttribute(CONNECTION_CREATION_TIME, System.currentTimeMillis());
 
@@ -291,6 +295,7 @@ public class ClientHandler implements NH
         shutdownConnection(conn);
         context.removeAttribute(RESPONSE_SINK_BUFFER);
         context.removeAttribute(REQUEST_SOURCE_BUFFER);
+        metrics.disconnected();
     }
 
     /**

Modified: synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java?rev=965396&r1=965395&r2=965396&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java (original)
+++ synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListener.java Mon Jul 19 09:21:58 2010
@@ -53,6 +53,7 @@ import org.apache.synapse.commons.execut
 import org.apache.synapse.commons.evaluators.Parser;
 import org.apache.synapse.commons.evaluators.EvaluatorException;
 import org.apache.synapse.commons.evaluators.EvaluatorConstants;
+import org.apache.synapse.transport.nhttp.util.NhttpMetricsCollector;
 
 import javax.net.ssl.SSLContext;
 import javax.xml.stream.XMLStreamException;
@@ -105,7 +106,7 @@ public class HttpCoreNIOListener impleme
     /** JMX support */
     private TransportMBeanSupport mbeanSupport;
     /** Metrics collector for this transport */
-    private MetricsCollector metrics = new MetricsCollector();
+    private NhttpMetricsCollector metrics = null;
     /** state of the listener */
     private volatile int state = BaseConstants.STOPPED;
     /** The ServerHandler */
@@ -198,6 +199,7 @@ public class HttpCoreNIOListener impleme
         mbeanSupport
             = new TransportMBeanSupport(this, "nio-http" + (sslContext == null ? "" : "s"));
         mbeanSupport.register();
+        metrics = new NhttpMetricsCollector(true, sslContext != null);
 
         // create the priority based executor and parser
         param = transprtIn.getParameter(NhttpConstants.PRIORITY_CONFIG_FILE_NAME);
@@ -572,6 +574,7 @@ public class HttpCoreNIOListener impleme
         ioReactor = null;
         cfgCtx.getAxisConfiguration().getObserversList().remove(axisObserver);
         mbeanSupport.unregister();
+        metrics.destroy();
     }
 
     /**

Modified: synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java?rev=965396&r1=965395&r2=965396&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java (original)
+++ synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java Mon Jul 19 09:21:58 2010
@@ -59,6 +59,7 @@ import org.apache.synapse.transport.nhtt
 import org.apache.synapse.transport.nhttp.debug.ServerConnectionDebug;
 import org.apache.synapse.transport.nhttp.util.MessageFormatterDecoratorFactory;
 import org.apache.synapse.transport.nhttp.util.NhttpUtil;
+import org.apache.synapse.transport.nhttp.util.NhttpMetricsCollector;
 
 import javax.net.ssl.SSLContext;
 import java.io.IOException;
@@ -92,7 +93,7 @@ public class HttpCoreNIOSender extends A
     /** JMX support */
     private TransportMBeanSupport mbeanSupport;
     /** Metrics collector for the sender */
-    private MetricsCollector metrics = new MetricsCollector();
+    private NhttpMetricsCollector metrics = null;
     /** state of the listener */
     private volatile int state = BaseConstants.STOPPED;
     /** The proxy host */
@@ -183,6 +184,7 @@ public class HttpCoreNIOSender extends A
             log.error("Error starting the IOReactor", e);
         }
 
+        metrics = new NhttpMetricsCollector(false, sslContext != null);
         handler = new ClientHandler(cfgCtx, params, metrics);
         final IOEventDispatch ioEventDispatch = getEventDispatch(
             handler, sslContext, sslIOSessionHandler, params, transportOut);
@@ -669,6 +671,7 @@ public class HttpCoreNIOSender extends A
             log.warn("Error shutting down IOReactor", e);
         }
         mbeanSupport.unregister();
+        metrics.destroy();
     }
 
     /**

Modified: synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java?rev=965396&r1=965395&r2=965396&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java (original)
+++ synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/ServerHandler.java Mon Jul 19 09:21:58 2010
@@ -51,6 +51,7 @@ import org.apache.synapse.commons.execut
 import org.apache.synapse.commons.jmx.ThreadingView;
 import org.apache.synapse.transport.nhttp.debug.ServerConnectionDebug;
 import org.apache.synapse.transport.nhttp.util.LatencyView;
+import org.apache.synapse.transport.nhttp.util.NhttpMetricsCollector;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -90,7 +91,7 @@ public class ServerHandler implements NH
     /** the thread pool to process requests */
     private WorkerPool workerPool = null;
     /** the metrics collector */
-    private MetricsCollector metrics = null;
+    private NhttpMetricsCollector metrics = null;
     
     /** keeps track of the connection that are alive in the system */
     private volatile List<NHttpServerConnection> activeConnections = null;
@@ -108,7 +109,7 @@ public class ServerHandler implements NH
     public static final String SERVER_CONNECTION_DEBUG = "synapse.server-connection-debug";
 
     public ServerHandler(final ConfigurationContext cfgCtx, final HttpParams params,
-        final boolean isHttps, final MetricsCollector metrics,
+        final boolean isHttps, final NhttpMetricsCollector metrics,
         Parser parser, PriorityExecutor executor) {
         super();
         this.cfgCtx = cfgCtx;
@@ -343,8 +344,8 @@ public class ServerHandler implements NH
      * HttpProcessor and submits it to be sent out. Re-Throws exceptions, after closing connections
      * @param conn the connection being processed
      * @param response the response to commit over the connection
-     * @throws IOException
-     * @throws HttpException
+     * @throws IOException if an IO error occurs while sending the response
+     * @throws HttpException if a HTTP protocol violation occurs while sending the response
      */
     public void commitResponse(final NHttpServerConnection conn,
         final HttpResponse response) throws IOException, HttpException {
@@ -386,6 +387,9 @@ public class ServerHandler implements NH
         if (log.isTraceEnabled()) {
             log.trace("New incoming connection");
         }
+
+        metrics.connected();
+
         // record connection creation time for debug logging
         conn.getContext().setAttribute(CONNECTION_CREATION_TIME, System.currentTimeMillis());
         if (log.isDebugEnabled()) {
@@ -433,6 +437,7 @@ public class ServerHandler implements NH
         if (log.isTraceEnabled()) {
             log.trace("Connection closed");
         }
+        metrics.disconnected();
     }
 
     public void markActiveConnectionsToBeClosed() {

Added: synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/ConnectionsView.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/ConnectionsView.java?rev=965396&view=auto
==============================================================================
--- synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/ConnectionsView.java (added)
+++ synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/ConnectionsView.java Mon Jul 19 09:21:58 2010
@@ -0,0 +1,265 @@
+/*
+*  Licensed to the Apache Software Foundation (ASF) under one
+*  or more contributor license agreements.  See the NOTICE file
+*  distributed with this work for additional information
+*  regarding copyright ownership.  The ASF licenses this file
+*  to you under the Apache License, Version 2.0 (the
+*  "License"); you may not use this file except in compliance
+*  with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing,
+*  software distributed under the License is distributed on an
+*   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+*  KIND, either express or implied.  See the License for the
+*  specific language governing permissions and limitations
+*  under the License.
+*/
+
+package org.apache.synapse.transport.nhttp.util;
+
+import org.apache.synapse.commons.jmx.MBeanRegistrar;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ConnectionsView MBean can be used to collect and monitor statistics on HTTP connections
+ * created by the NHTTP transport. Connection statistics can be divided into two categories,
+ * namely short term data and long term data. Short term data is related to the last 15
+ * minutes of execution and they are updated every minute. Long term data is related to
+ * the last 24 hours of execution and they get updated every 5 minutes.  In addition to the
+ * connection statistics this MBean also provides information on the request and response
+ * sizes received over the HTTP connections. All messages are divided into six categories
+ * based on their sizes and the resulting counts are made available as a table. 
+ */
+public class ConnectionsView implements ConnectionsViewMBean {
+
+    private static final String NHTTP_CONNECTIONS = "NhttpConnections";
+
+    // Bucket definitions
+    private static final int LESS_THAN_1K       = 0;
+    private static final int LESS_THAN_10K      = 1;
+    private static final int LESS_THAN_100K     = 2;
+    private static final int LESS_THAN_1M       = 3;
+    private static final int LESS_THAN_10M      = 4;
+    private static final int GREATER_THAN_10M   = 5;
+
+    private static final int SHORT_DATA_COLLECTION_PERIOD = 60;
+    private static final int LONG_DATA_COLLECTION_PERIOD = 60 * 5;
+
+    private static final int SAMPLES_PER_HOUR = (60 * 60)/LONG_DATA_COLLECTION_PERIOD;
+
+    private Queue<Integer> shortTermDataQueue = new LinkedList<Integer>();
+    private Queue<Integer> longTermDataQueue = new LinkedList<Integer>();
+
+    private AtomicInteger activeConnections = new AtomicInteger(0);
+    private AtomicInteger shortTermOpenedConnections = new AtomicInteger(0);
+    private AtomicInteger longTermOpenedConnections = new AtomicInteger(0);
+
+    // The array length must be equal to the number of buckets
+    private AtomicInteger[] requestSizeCounters = new AtomicInteger[6];
+    private AtomicInteger[] responseSizeCounters = new AtomicInteger[6];
+
+    private Date resetTime = Calendar.getInstance().getTime();
+
+    private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+    private String name;
+
+    public ConnectionsView(String name) {
+        this.name = name;
+
+        initCounters(requestSizeCounters);
+        initCounters(responseSizeCounters);
+
+        Runnable task = new Runnable() {
+            public void run() {
+                // We only need historical data for the last 15 minutes
+                // Therefore no need to keep data older than that...
+                if (shortTermDataQueue.size() == 15) {
+                    shortTermDataQueue.remove();
+                }
+                shortTermDataQueue.offer(shortTermOpenedConnections.getAndSet(0));
+            }
+        };
+        // Delay the timer by 1 minute to prevent the task from starting immediately
+        scheduler.scheduleAtFixedRate(task, SHORT_DATA_COLLECTION_PERIOD,
+                SHORT_DATA_COLLECTION_PERIOD, TimeUnit.SECONDS);
+
+        Runnable longTermCollector = new Runnable() {
+            public void run() {
+                // We only need historical data for the last 24 hours
+                // Therefore no need to keep data older than that...
+                if (longTermDataQueue.size() == 24 * SAMPLES_PER_HOUR) {
+                    longTermDataQueue.remove();
+                }
+                longTermDataQueue.offer(longTermOpenedConnections.getAndSet(0));
+            }
+        };
+        scheduler.scheduleAtFixedRate(longTermCollector, LONG_DATA_COLLECTION_PERIOD,
+                LONG_DATA_COLLECTION_PERIOD, TimeUnit.SECONDS);
+        MBeanRegistrar.getInstance().registerMBean(this, NHTTP_CONNECTIONS, name);
+    }
+
+    public void destroy() {
+        MBeanRegistrar.getInstance().unRegisterMBean(NHTTP_CONNECTIONS, name);
+        scheduler.shutdownNow();
+    }
+
+    private void initCounters(AtomicInteger[] counters) {
+        for (int i = 0; i < counters.length; i++) {
+            if (counters[i] == null) {
+                counters[i] = new AtomicInteger(0);
+            } else {
+                counters[i].set(0);
+            }
+        }
+    }
+
+    protected void connected() {
+        activeConnections.incrementAndGet();
+        shortTermOpenedConnections.incrementAndGet();
+        longTermOpenedConnections.incrementAndGet();
+    }
+
+    protected void disconnected() {
+        activeConnections.decrementAndGet();
+    }
+
+    protected void notifyMessageSize(long size, boolean isRequest) {
+        // This logic gets executed for each and every transaction. For a typical
+        // mediation scenario this method will be called 4 times. Therefore I'm using
+        // arrays of integers to keep the overhead down to a minimum. Since the number
+        // of buckets is 6, this can be easily managed without using a slow data structure
+        // like a HashMap. This approach guarantees O(1) complexity.
+
+        AtomicInteger[] counters = isRequest ? requestSizeCounters : responseSizeCounters;
+
+        if (size < 1024) {
+            counters[LESS_THAN_1K].incrementAndGet();
+        } else if (size < 10240) {
+            counters[LESS_THAN_10K].incrementAndGet();
+        } else if (size < 102400) {
+            counters[LESS_THAN_100K].incrementAndGet();
+        } else if (size < 1048576) {
+            counters[LESS_THAN_1M].incrementAndGet();
+        } else if (size < 10485760) {
+            counters[LESS_THAN_10M].incrementAndGet();
+        } else {
+            counters[GREATER_THAN_10M].incrementAndGet();
+        }
+    }
+
+    public int getActiveConnections() {
+        return activeConnections.get();
+    }
+
+    public int getLastMinuteConnections() {
+        return getTotalConnections(1);
+    }
+
+    public int getLast5MinuteConnections() {
+        return getTotalConnections(5);
+    }
+
+    public int getLast15MinuteConnections() {
+        return getTotalConnections(15);
+    }
+
+    public int getLastHourConnections() {
+        return getTotalConnectionsByHour(1);
+    }
+
+    public int getLast8HourConnections() {
+        return getTotalConnectionsByHour(8);
+    }
+
+    public int getLast24HourConnections() {
+        return getTotalConnectionsByHour(24);
+    }
+
+    public Map getRequestSizesMap() {
+        return getCountersMap(requestSizeCounters);
+    }
+
+    public Map getResponseSizesMap() {
+        return getCountersMap(responseSizeCounters);
+    }
+
+    public Date getLastResetTime() {
+        return resetTime;
+    }
+
+    private Map<String,Integer> getCountersMap(AtomicInteger[] counters) {
+        // This ensures that keys are returned in the same order we insert them
+        // Provides better readability in the JMX consoles
+        Map<String,Integer> map = new LinkedHashMap<String,Integer>();
+        map.put("< 1 K", counters[LESS_THAN_1K].get());
+        map.put("< 10 K", counters[LESS_THAN_10K].get());
+        map.put("< 100 K", counters[LESS_THAN_100K].get());
+        map.put("< 1 M", counters[LESS_THAN_1M].get());
+        map.put("< 10 M", counters[LESS_THAN_10M].get());
+        map.put("> 10 M", counters[GREATER_THAN_10M].get());
+        return map;
+    }
+
+    public void reset() {
+        initCounters(requestSizeCounters);
+        initCounters(responseSizeCounters);
+        shortTermDataQueue.clear();
+        longTermDataQueue.clear();
+        resetTime = Calendar.getInstance().getTime();
+    }
+
+    /**
+     * Return the number of total connections opened during last 'n' munites
+     * of execution
+     *
+     * @param n Number of minutes in the execution history
+     * @return The number of connections opened
+     */
+    private int getTotalConnections(int n) {
+        int sum = 0;
+        Integer[] array = shortTermDataQueue.toArray(new Integer[shortTermDataQueue.size()]);
+
+        if (n > array.length) {
+            for (int i = 0; i < array.length; i++) {
+                sum += array[i];
+            }
+        } else {
+            for (int i = 0; i < n; i++) {
+                sum += array[array.length - 1 - i];
+            }
+        }
+        return sum;
+    }
+
+    /**
+     * Return the number of total connections opened during last 'n' hours
+     * of execution
+     *
+     * @param n Number of hours in the execution history
+     * @return The number of connections opened
+     */
+    private int getTotalConnectionsByHour(int n) {
+        int samples = n * SAMPLES_PER_HOUR;
+        int sum = 0;
+        Integer[] array = longTermDataQueue.toArray(new Integer[longTermDataQueue.size()]);
+
+        if (samples > array.length) {
+            for (int i = 0; i < array.length; i++) {
+                sum += array[i];
+            }
+        } else {
+            for (int i = 0; i < samples; i++) {
+                sum += array[array.length - 1 - i];
+            }
+        }
+        return sum;
+    }
+}

Added: synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/ConnectionsViewMBean.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/ConnectionsViewMBean.java?rev=965396&view=auto
==============================================================================
--- synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/ConnectionsViewMBean.java (added)
+++ synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/ConnectionsViewMBean.java Mon Jul 19 09:21:58 2010
@@ -0,0 +1,40 @@
+/*
+*  Licensed to the Apache Software Foundation (ASF) under one
+*  or more contributor license agreements.  See the NOTICE file
+*  distributed with this work for additional information
+*  regarding copyright ownership.  The ASF licenses this file
+*  to you under the Apache License, Version 2.0 (the
+*  "License"); you may not use this file except in compliance
+*  with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing,
+*  software distributed under the License is distributed on an
+*   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+*  KIND, either express or implied.  See the License for the
+*  specific language governing permissions and limitations
+*  under the License.
+*/
+
+package org.apache.synapse.transport.nhttp.util;
+
+import java.util.Map;
+import java.util.Date;
+
+public interface ConnectionsViewMBean {
+
+    public int getActiveConnections();
+    public int getLastMinuteConnections();
+    public int getLast5MinuteConnections();
+    public int getLast15MinuteConnections();
+    public int getLastHourConnections();
+    public int getLast8HourConnections();
+    public int getLast24HourConnections();
+    public Map getRequestSizesMap();
+    public Map getResponseSizesMap();
+    public Date getLastResetTime();
+
+    public void reset();
+    
+}

Added: synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/NhttpMetricsCollector.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/NhttpMetricsCollector.java?rev=965396&view=auto
==============================================================================
--- synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/NhttpMetricsCollector.java (added)
+++ synapse/trunk/java/modules/transports/core/nhttp/src/main/java/org/apache/synapse/transport/nhttp/util/NhttpMetricsCollector.java Mon Jul 19 09:21:58 2010
@@ -0,0 +1,73 @@
+/*
+*  Licensed to the Apache Software Foundation (ASF) under one
+*  or more contributor license agreements.  See the NOTICE file
+*  distributed with this work for additional information
+*  regarding copyright ownership.  The ASF licenses this file
+*  to you under the Apache License, Version 2.0 (the
+*  "License"); you may not use this file except in compliance
+*  with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing,
+*  software distributed under the License is distributed on an
+*   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+*  KIND, either express or implied.  See the License for the
+*  specific language governing permissions and limitations
+*  under the License.
+*/
+
+package org.apache.synapse.transport.nhttp.util;
+
+import org.apache.axis2.transport.base.MetricsCollector;
+
+/**
+ * <p>This simple extension of the Axis2 transport MetricsCollector implementation,
+ * maintains a ConnectionsView instance, which is updated based on the events fired
+ * by the NHTTP transport implementation. In addition to the usual events handled
+ * by the Axis2 MetricsCollector, this implementation handles two new events:</p>
+ * <ul>
+ *    <li>connected (this should get called whenever a new connection is created)</li>
+ *    <li>disconnected (this should get called whenever an existing connection is closed)</li>
+ * <ul>
+ * <p>These new events are used to update the ConnectionsView at runtime.</p>
+ */
+public class NhttpMetricsCollector extends MetricsCollector {
+
+    private ConnectionsView view;
+    private boolean listener;
+
+    public NhttpMetricsCollector(boolean listener, boolean isHttps) {
+        this.listener = listener;
+        String name = "http" + (isHttps ? "s" : "") + "-" + (listener ? "listener" : "sender");
+        this.view = new ConnectionsView(name);
+    }
+
+    public void destroy() {
+        view.destroy();
+    }
+
+    public void connected() {
+        view.connected();
+    }
+
+    public void disconnected() {
+        view.disconnected();
+    }
+
+    @Override
+    public void notifyReceivedMessageSize(long l) {
+        super.notifyReceivedMessageSize(l);
+        if (l > 0) {
+            view.notifyMessageSize(l, listener);
+        }
+    }
+
+    @Override
+    public void notifySentMessageSize(long l) {
+        super.notifySentMessageSize(l);
+        if (l > 0) {
+            view.notifyMessageSize(l, !listener);
+        }
+    }
+}