You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:48 UTC

[10/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClient.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClient.java
index 6b79fb4..45c353c 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClient.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClient.java
@@ -17,6 +17,26 @@
  */
 package com.alibaba.jstorm.message.netty;
 
+import backtype.storm.Config;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.utils.DisruptorQueue;
+import backtype.storm.utils.Utils;
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.common.metric.*;
+import com.alibaba.jstorm.metric.*;
+import com.alibaba.jstorm.utils.JStormServerUtils;
+import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.NetWorkUtils;
+import com.codahale.metrics.health.HealthCheck;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.HashSet;
@@ -29,35 +49,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-import backtype.storm.messaging.IConnection;
-import backtype.storm.messaging.TaskMessage;
-import backtype.storm.utils.DisruptorQueue;
-import backtype.storm.utils.Utils;
-
-import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.common.metric.Histogram;
-import com.alibaba.jstorm.common.metric.Meter;
-import com.alibaba.jstorm.common.metric.QueueGauge;
-import com.alibaba.jstorm.metric.JStormHealthCheck;
-import com.alibaba.jstorm.metric.JStormMetrics;
-import com.alibaba.jstorm.metric.MetricDef;
-import com.alibaba.jstorm.utils.JStormServerUtils;
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.jstorm.utils.NetWorkUtils;
-import com.codahale.metrics.health.HealthCheck;
-
 class NettyClient implements IConnection {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(NettyClient.class);
+    private static final Logger LOG = LoggerFactory.getLogger(NettyClient.class);
+
     protected String name;
 
     protected final int max_retries;
@@ -84,9 +78,11 @@ class NettyClient implements IConnection {
 
     protected String address;
     // doesn't use timer, due to competition
-    protected Histogram sendTimer;
-    protected Histogram batchSizeHistogram;
-    protected Meter     sendSpeed;
+    protected AsmHistogram sendTimer;
+    protected AsmHistogram batchSizeHistogram;
+    protected AsmMeter sendSpeed;
+    protected static AsmMeter totalSendSpeed = (AsmMeter) JStormMetrics.registerWorkerMetric(MetricUtils.workerMetricName(
+            MetricDef.NETTY_CLI_SEND_SPEED, MetricType.METER), new AsmMeter());
 
     protected ReconnectRunnable reconnector;
     protected ChannelFactory clientChannelFactory;
@@ -94,19 +90,19 @@ class NettyClient implements IConnection {
     protected Set<Channel> closingChannel;
 
     protected AtomicBoolean isConnecting = new AtomicBoolean(false);
-    
+
     protected NettyConnection nettyConnection;
-    
+
     protected Map stormConf;
-    
+
     protected boolean connectMyself;
 
     protected Object channelClosing = new Object();
 
+    protected boolean enableNettyMetrics;
+
     @SuppressWarnings("rawtypes")
-    NettyClient(Map storm_conf, ChannelFactory factory,
-            ScheduledExecutorService scheduler, String host, int port,
-            ReconnectRunnable reconnector) {
+    NettyClient(Map storm_conf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port, ReconnectRunnable reconnector) {
         this.stormConf = storm_conf;
         this.factory = factory;
         this.scheduler = scheduler;
@@ -116,34 +112,21 @@ class NettyClient implements IConnection {
         channelRef = new AtomicReference<Channel>(null);
         being_closed = new AtomicBoolean(false);
         pendings = new AtomicLong(0);
-        
+
         nettyConnection = new NettyConnection();
-        nettyConnection.setClientPort(NetWorkUtils.ip(), 
-                ConfigExtension.getLocalWorkerPort(storm_conf));
+        nettyConnection.setClientPort(NetWorkUtils.ip(), ConfigExtension.getLocalWorkerPort(storm_conf));
         nettyConnection.setServerPort(host, port);
 
         // Configure
-        buffer_size =
-                Utils.getInt(storm_conf
-                        .get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
-        max_retries =
-                Math.min(30, Utils.getInt(storm_conf
-                        .get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)));
-        base_sleep_ms =
-                Utils.getInt(storm_conf
-                        .get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
-        max_sleep_ms =
-                Utils.getInt(storm_conf
-                        .get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
+        buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+        max_retries = Math.min(30, Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)));
+        base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
+        max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
 
         timeoutMs = ConfigExtension.getNettyPendingBufferTimeout(storm_conf);
-        MAX_SEND_PENDING =
-                (int) ConfigExtension.getNettyMaxSendPending(storm_conf);
+        MAX_SEND_PENDING = (int) ConfigExtension.getNettyMaxSendPending(storm_conf);
 
-        this.messageBatchSize =
-                Utils.getInt(
-                        storm_conf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE),
-                        262144);
+        this.messageBatchSize = Utils.getInt(storm_conf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
         messageBatchRef = new AtomicReference<MessageBatch>();
 
         // Start the connection attempt.
@@ -152,56 +135,62 @@ class NettyClient implements IConnection {
         connectMyself = isConnectMyself(stormConf, host, port);
 
         address = JStormServerUtils.getName(host, port);
-        
-        if (connectMyself == false) {
+
+        this.enableNettyMetrics = MetricUtils.isEnableNettyMetrics(storm_conf);
+        LOG.info("** enable netty metrics: {}", this.enableNettyMetrics);
+        if (!connectMyself) {
             registerMetrics();
         }
         closingChannel = new HashSet<Channel>();
     }
-    
+
     public void registerMetrics() {
-        sendTimer =
-                JStormMetrics.registerWorkerHistogram(
-                        MetricDef.NETTY_CLI_SEND_TIME, nettyConnection.toString());
-        batchSizeHistogram =
-                JStormMetrics.registerWorkerHistogram(
-                        MetricDef.NETTY_CLI_BATCH_SIZE, nettyConnection.toString());
-        sendSpeed = JStormMetrics.registerWorkerMeter(MetricDef.NETTY_CLI_SEND_SPEED, 
-                nettyConnection.toString());
-
-        CacheGaugeHealthCheck cacheGauge =
-                new CacheGaugeHealthCheck(messageBatchRef,
-                        MetricDef.NETTY_CLI_CACHE_SIZE + ":" + nettyConnection.toString());
-        JStormMetrics.registerWorkerGauge(cacheGauge,
-                MetricDef.NETTY_CLI_CACHE_SIZE, nettyConnection.toString());
-        JStormHealthCheck.registerWorkerHealthCheck(
-                MetricDef.NETTY_CLI_CACHE_SIZE + ":" + nettyConnection.toString(), cacheGauge);
-
-        JStormMetrics.registerWorkerGauge(
-                new com.codahale.metrics.Gauge<Double>() {
+        if (this.enableNettyMetrics) {
+            sendTimer = (AsmHistogram) JStormMetrics.registerNettyMetric(
+                    MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_TIME, nettyConnection),
+                            MetricType.HISTOGRAM),
+                    new AsmHistogram());
+            batchSizeHistogram = (AsmHistogram) JStormMetrics.registerNettyMetric(
+                    MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_BATCH_SIZE, nettyConnection),
+                            MetricType.HISTOGRAM),
+                    new AsmHistogram());
+            sendSpeed = (AsmMeter) JStormMetrics.registerNettyMetric(MetricUtils.nettyMetricName(
+                    AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_SPEED, nettyConnection), MetricType.METER), new AsmMeter());
+
+            CacheGaugeHealthCheck cacheGauge = new CacheGaugeHealthCheck(messageBatchRef,
+                    MetricDef.NETTY_CLI_CACHE_SIZE + ":" + nettyConnection.toString());
+            JStormMetrics.registerNettyMetric(MetricUtils
+                            .nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_CACHE_SIZE, nettyConnection), MetricType.GAUGE),
+                    new AsmGauge(cacheGauge));
+
+            JStormMetrics.registerNettyMetric(MetricUtils
+                            .nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_PENDING, nettyConnection), MetricType.GAUGE),
+                    new AsmGauge(new com.codahale.metrics.Gauge<Double>() {
+                        @Override
+                        public Double getValue() {
+                            return ((Long) pendings.get()).doubleValue();
+                        }
+                    }));
 
-                    @Override
-                    public Double getValue() {
-                        return ((Long) pendings.get()).doubleValue();
-                    }
-                }, MetricDef.NETTY_CLI_SEND_PENDING, nettyConnection.toString());
-        
-        JStormHealthCheck.registerWorkerHealthCheck(
-                MetricDef.NETTY_CLI_CONNECTION + ":" + nettyConnection.toString(), 
+            JStormHealthCheck.registerWorkerHealthCheck(MetricDef.NETTY_CLI_CACHE_SIZE + ":" + nettyConnection.toString(),
+                    cacheGauge);
+        }
+
+        JStormHealthCheck.registerWorkerHealthCheck(MetricDef.NETTY_CLI_CONNECTION + ":" + nettyConnection.toString(),
                 new HealthCheck() {
-                    HealthCheck.Result healthy = HealthCheck.Result.healthy();
-                    HealthCheck.Result unhealthy = HealthCheck.Result.unhealthy
-                            ("NettyConnection " + nettyConnection.toString() + " is broken.");
+                    Result healthy = Result.healthy();
+                    Result unhealthy = Result
+                            .unhealthy("NettyConnection " + nettyConnection.toString() + " is broken.");
+
                     @Override
                     protected Result check() throws Exception {
-                        // TODO Auto-generated method stub
                         if (isChannelReady() == null) {
                             return unhealthy;
-                        }else {
+                        } else {
                             return healthy;
                         }
                     }
-                    
+
                 });
     }
 
@@ -216,23 +205,28 @@ class NettyClient implements IConnection {
         bootstrap.setPipelineFactory(new StormClientPipelineFactory(this, stormConf));
         reconnect();
     }
-    
+
     public boolean isConnectMyself(Map conf, String host, int port) {
         String localIp = NetWorkUtils.ip();
         String remoteIp = NetWorkUtils.host2Ip(host);
         int localPort = ConfigExtension.getLocalWorkerPort(conf);
-        
-        if (localPort == port && 
-                localIp.equals(remoteIp)) {
+
+        if (localPort == port && localIp.equals(remoteIp)) {
             return true;
         }
-        
+
         return false;
     }
 
+    public void notifyInterestChanged(Channel channel) {
+        if (channel.isWritable()) {
+            MessageBatch messageBatch = messageBatchRef.getAndSet(null);
+            flushRequest(channel, messageBatch);
+        }
+    }
+
     /**
      * The function can't be synchronized, otherwise it will be deadlock
-     * 
      */
     public void doReconnect() {
         if (channelRef.get() != null) {
@@ -255,12 +249,10 @@ class NettyClient implements IConnection {
         }
 
         long sleepMs = getSleepTimeMs();
-        LOG.info("Reconnect ... [{}], {}, sleep {}ms", retries.get(), name,
-                sleepMs);
+        LOG.info("Reconnect ... [{}], {}, sleep {}ms", retries.get(), name, sleepMs);
         ChannelFuture future = bootstrap.connect(remote_addr);
         future.addListener(new ChannelFutureListener() {
-            public void operationComplete(ChannelFuture future)
-                    throws Exception {
+            public void operationComplete(ChannelFuture future) throws Exception {
                 isConnecting.set(false);
                 Channel channel = future.getChannel();
                 if (future.isSuccess()) {
@@ -269,17 +261,12 @@ class NettyClient implements IConnection {
                     setChannel(channel);
                     // handleResponse();
                 } else {
-                    LOG.info(
-                            "Failed to reconnect ... [{}], {}, channel = {}, cause = {}",
-                            retries.get(), name, channel, future.getCause());
+                    LOG.info("Failed to reconnect ... [{}], {}, channel = {}, cause = {}", retries.get(), name, channel, future.getCause());
                     reconnect();
                 }
             }
         });
         JStormUtils.sleepMs(sleepMs);
-
-        return;
-
     }
 
     public void reconnect() {
@@ -290,7 +277,6 @@ class NettyClient implements IConnection {
      * # of milliseconds to wait per exponential back-off policy
      */
     private int getSleepTimeMs() {
-
         int sleepMs = base_sleep_ms * retries.incrementAndGet();
         if (sleepMs > 1000) {
             sleepMs = 1000;
@@ -310,7 +296,7 @@ class NettyClient implements IConnection {
     public void send(TaskMessage message) {
         LOG.warn("Should be overload");
     }
-    
+
     Channel isChannelReady() {
         Channel channel = channelRef.get();
         if (channel == null) {
@@ -325,26 +311,28 @@ class NettyClient implements IConnection {
         return channel;
     }
 
-    protected synchronized void flushRequest(Channel channel,
-            final MessageBatch requests) {
+    protected synchronized void flushRequest(Channel channel, final MessageBatch requests) {
         if (requests == null || requests.isEmpty())
             return;
 
-        Double batchSize = Double.valueOf(requests.getEncoded_length());
-        batchSizeHistogram.update(batchSize);
+        Long batchSize = (long) requests.getEncoded_length();
+        if (batchSizeHistogram != null) {
+            batchSizeHistogram.update(batchSize);
+        }
         pendings.incrementAndGet();
-        sendSpeed.update(batchSize);
+        if (sendSpeed != null) {
+            sendSpeed.update(batchSize);
+        }
+        totalSendSpeed.update(batchSize);
         ChannelFuture future = channel.write(requests);
         future.addListener(new ChannelFutureListener() {
-            public void operationComplete(ChannelFuture future)
-                    throws Exception {
+            public void operationComplete(ChannelFuture future) throws Exception {
 
                 pendings.decrementAndGet();
                 if (!future.isSuccess()) {
                     Channel channel = future.getChannel();
                     if (isClosed() == false) {
-                        LOG.info("Failed to send requests to " + name + ": "
-                                + channel.toString() + ":", future.getCause());
+                        LOG.info("Failed to send requests to " + name + ": " + channel.toString() + ":", future.getCause());
                     }
 
                     if (null != channel) {
@@ -357,32 +345,29 @@ class NettyClient implements IConnection {
             }
         });
     }
-    
+
     public void unregisterMetrics() {
-        JStormMetrics.unregisterWorkerMetric(MetricDef.NETTY_CLI_SEND_TIME,
-                nettyConnection.toString());
-        JStormMetrics.unregisterWorkerMetric(MetricDef.NETTY_CLI_BATCH_SIZE,
-                nettyConnection.toString());
-        JStormMetrics.unregisterWorkerMetric(MetricDef.NETTY_CLI_SEND_PENDING,
-                nettyConnection.toString());
-        JStormMetrics.unregisterWorkerMetric(MetricDef.NETTY_CLI_CACHE_SIZE,
-                nettyConnection.toString());
-        JStormMetrics.unregisterWorkerMetric(MetricDef.NETTY_CLI_SEND_SPEED, 
-                nettyConnection.toString());
-
-        JStormHealthCheck
-                .unregisterWorkerHealthCheck(MetricDef.NETTY_CLI_CACHE_SIZE
-                        + ":" + nettyConnection.toString());
-        
-        JStormHealthCheck.unregisterWorkerHealthCheck(
-                MetricDef.NETTY_CLI_CONNECTION + ":" + nettyConnection.toString()); 
+        if (this.enableNettyMetrics) {
+            JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName(
+                    AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_TIME, nettyConnection), MetricType.HISTOGRAM));
+            JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName(
+                    AsmMetric.mkName(MetricDef.NETTY_CLI_BATCH_SIZE, nettyConnection), MetricType.HISTOGRAM));
+            JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName(
+                    AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_PENDING, nettyConnection), MetricType.GAUGE));
+            JStormMetrics.unregisterNettyMetric(MetricUtils
+                    .nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_CACHE_SIZE, nettyConnection), MetricType.GAUGE));
+            JStormMetrics.unregisterNettyMetric(MetricUtils
+                    .nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_CLI_SEND_SPEED, nettyConnection), MetricType.METER));
+        }
+        JStormHealthCheck.unregisterWorkerHealthCheck(MetricDef.NETTY_CLI_CACHE_SIZE + ":" + nettyConnection.toString());
+
+        JStormHealthCheck.unregisterWorkerHealthCheck(MetricDef.NETTY_CLI_CONNECTION + ":" + nettyConnection.toString());
     }
 
     /**
      * gracefully close this client.
-     * 
-     * We will send all existing requests, and then invoke close_n_release()
-     * method
+     * <p/>
+     * We will send all existing requests, and then invoke close_n_release() method
      */
     public void close() {
         LOG.info("Close netty connection to {}", name());
@@ -391,7 +376,7 @@ class NettyClient implements IConnection {
             return;
         }
 
-        if (connectMyself == false) {
+        if (!connectMyself) {
             unregisterMetrics();
         }
 
@@ -410,17 +395,13 @@ class NettyClient implements IConnection {
         final long timeoutMilliSeconds = 10 * 1000;
         final long start = System.currentTimeMillis();
 
-        LOG.info("Waiting for pending batchs to be sent with " + name()
-                + "..., timeout: {}ms, pendings: {}", timeoutMilliSeconds,
-                pendings.get());
+        LOG.info("Waiting for pending batchs to be sent with " + name() + "..., timeout: {}ms, pendings: {}", timeoutMilliSeconds, pendings.get());
 
         while (pendings.get() != 0) {
             try {
                 long delta = System.currentTimeMillis() - start;
                 if (delta > timeoutMilliSeconds) {
-                    LOG.error(
-                            "Timeout when sending pending batchs with {}..., there are still {} pending batchs not sent",
-                            name(), pendings.get());
+                    LOG.error("Timeout when sending pending batchs with {}..., there are still {} pending batchs not sent", name(), pendings.get());
                     break;
                 }
                 Thread.sleep(1000); // sleep 1s
@@ -445,7 +426,7 @@ class NettyClient implements IConnection {
 
     /**
      * Avoid channel double close
-     * 
+     *
      * @param channel
      */
     void closeChannel(final Channel channel) {
@@ -461,8 +442,7 @@ class NettyClient implements IConnection {
         LOG.debug(channel.toString() + " begin to closed");
         ChannelFuture closeFuture = channel.close();
         closeFuture.addListener(new ChannelFutureListener() {
-            public void operationComplete(ChannelFuture future)
-                    throws Exception {
+            public void operationComplete(ChannelFuture future) throws Exception {
 
                 synchronized (channelClosing) {
                     closingChannel.remove(channel);
@@ -501,14 +481,9 @@ class NettyClient implements IConnection {
             retries.set(0);
         }
 
-        final String oldLocalAddres =
-                (oldChannel == null) ? "null" : oldChannel.getLocalAddress()
-                        .toString();
-        String newLocalAddress =
-                (newChannel == null) ? "null" : newChannel.getLocalAddress()
-                        .toString();
-        LOG.info("Use new channel {} replace old channel {}", newLocalAddress,
-                oldLocalAddres);
+        final String oldLocalAddres = (oldChannel == null) ? "null" : oldChannel.getLocalAddress().toString();
+        String newLocalAddress = (newChannel == null) ? "null" : newChannel.getLocalAddress().toString();
+        LOG.info("Use new channel {} replace old channel {}", newLocalAddress, oldLocalAddres);
 
         // avoid one netty client use too much connection, close old one
         if (oldChannel != newChannel && oldChannel != null) {
@@ -555,60 +530,56 @@ class NettyClient implements IConnection {
 
     @Override
     public Object recv(Integer taskId, int flags) {
-        throw new UnsupportedOperationException(
-                "recvTask: Client connection should not receive any messages");
+        throw new UnsupportedOperationException("recvTask: Client connection should not receive any messages");
     }
 
     @Override
     public void registerQueue(Integer taskId, DisruptorQueue recvQueu) {
-        throw new UnsupportedOperationException(
-                "recvTask: Client connection should not receive any messages");
+        throw new UnsupportedOperationException("recvTask: Client connection should not receive any messages");
     }
 
     @Override
     public void enqueue(TaskMessage message) {
-        throw new UnsupportedOperationException(
-                "recvTask: Client connection should not receive any messages");
+        throw new UnsupportedOperationException("recvTask: Client connection should not receive any messages");
     }
 
-    public static class CacheGaugeHealthCheck extends HealthCheck implements
-            com.codahale.metrics.Gauge<Double> {
+    public static class CacheGaugeHealthCheck extends HealthCheck implements com.codahale.metrics.Gauge<Double> {
 
         AtomicReference<MessageBatch> messageBatchRef;
         String name;
         Result healthy;
 
-        public CacheGaugeHealthCheck(
-                AtomicReference<MessageBatch> messageBatchRef, String name) {
+        public CacheGaugeHealthCheck(AtomicReference<MessageBatch> messageBatchRef, String name) {
             this.messageBatchRef = messageBatchRef;
             this.name = name;
-            this.healthy = HealthCheck.Result.healthy();
+            this.healthy = Result.healthy();
         }
 
         @Override
         public Double getValue() {
-            // TODO Auto-generated method stub
             MessageBatch messageBatch = messageBatchRef.get();
             if (messageBatch == null) {
                 return 0.0;
             } else {
-                Double ret = (double) messageBatch.getEncoded_length();
-                return ret;
+                return (double) messageBatch.getEncoded_length();
             }
 
         }
 
         @Override
         protected Result check() throws Exception {
-            // TODO Auto-generated method stub
             Double size = getValue();
             if (size > 8 * JStormUtils.SIZE_1_M) {
-                return HealthCheck.Result.unhealthy(name
-                        + QueueGauge.QUEUE_IS_FULL);
+                return Result.unhealthy(name + QueueGauge.QUEUE_IS_FULL);
             } else {
                 return healthy;
             }
         }
 
     }
+
+    @Override
+    public boolean available() {
+        return (isChannelReady() != null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientAsync.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientAsync.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientAsync.java
index 1d582ba..9b58063 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientAsync.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientAsync.java
@@ -17,31 +17,29 @@
  */
 package com.alibaba.jstorm.message.netty;
 
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.Config;
 import backtype.storm.messaging.TaskMessage;
 import backtype.storm.utils.Utils;
-
 import com.alibaba.jstorm.client.ConfigExtension;
 import com.alibaba.jstorm.utils.IntervalCheck;
 import com.alibaba.jstorm.utils.JStormServerUtils;
 import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.TimeUtils;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 class NettyClientAsync extends NettyClient {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(NettyClientAsync.class);
+    private static final Logger LOG = LoggerFactory.getLogger(NettyClientAsync.class);
     public static final String PREFIX = "Netty-Client-";
 
     // when batch buffer size is more than BATCH_THREASHOLD_WARN
@@ -54,7 +52,6 @@ class NettyClientAsync extends NettyClient {
     protected final boolean blockSend;
 
     boolean isDirectSend(Map conf) {
-
         if (JStormServerUtils.isOnePending(conf) == true) {
             return true;
         }
@@ -71,22 +68,15 @@ class NettyClientAsync extends NettyClient {
     }
 
     @SuppressWarnings("rawtypes")
-    NettyClientAsync(Map storm_conf, ChannelFactory factory,
-            ScheduledExecutorService scheduler, String host, int port,
-            ReconnectRunnable reconnector) {
+    NettyClientAsync(Map storm_conf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port, ReconnectRunnable reconnector) {
         super(storm_conf, factory, scheduler, host, port, reconnector);
 
-        BATCH_THREASHOLD_WARN =
-                ConfigExtension.getNettyBufferThresholdSize(storm_conf);
-
+        BATCH_THREASHOLD_WARN = ConfigExtension.getNettyBufferThresholdSize(storm_conf);
         blockSend = isBlockSend(storm_conf);
-
         directlySend = isDirectSend(storm_conf);
 
         flush_later = new AtomicBoolean(false);
-        flushCheckInterval =
-                Utils.getInt(storm_conf
-                        .get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10);
+        flushCheckInterval = Utils.getInt(storm_conf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10);
 
         Runnable flusher = new Runnable() {
             @Override
@@ -95,13 +85,11 @@ class NettyClientAsync extends NettyClient {
             }
         };
         long initialDelay = Math.min(1000, max_sleep_ms * max_retries);
-        scheduler.scheduleAtFixedRate(flusher, initialDelay,
-                flushCheckInterval, TimeUnit.MILLISECONDS);
+        scheduler.scheduleAtFixedRate(flusher, initialDelay, flushCheckInterval, TimeUnit.MILLISECONDS);
 
         clientChannelFactory = factory;
 
         start();
-
         LOG.info(this.toString());
     }
 
@@ -109,7 +97,7 @@ class NettyClientAsync extends NettyClient {
      * Enqueue a task message to be sent to server
      */
     @Override
-    synchronized public void send(List<TaskMessage> messages) {
+    public synchronized void send(List<TaskMessage> messages) {
         // throw exception if the client is being closed
         if (isClosed()) {
             LOG.warn("Client is being closed, and does not take requests any more");
@@ -123,13 +111,14 @@ class NettyClientAsync extends NettyClient {
             throw new RuntimeException(e);
         } finally {
             long end = System.nanoTime();
-            sendTimer.update((end - start)/1000000.0d);
-
+            if (sendTimer != null) {
+                sendTimer.update((end - start) / TimeUtils.NS_PER_US);
+            }
         }
     }
 
     @Override
-    synchronized public void send(TaskMessage message) {
+    public synchronized void send(TaskMessage message) {
         // throw exception if the client is being closed
         if (isClosed()) {
             LOG.warn("Client is being closed, and does not take requests any more");
@@ -143,7 +132,9 @@ class NettyClientAsync extends NettyClient {
             throw new RuntimeException(e);
         } finally {
             long end = System.nanoTime();
-            sendTimer.update((end - start)/1000000.0d);
+            if (sendTimer != null) {
+                sendTimer.update((end - start) / TimeUtils.NS_PER_US);
+            }
         }
     }
 
@@ -159,21 +150,17 @@ class NettyClientAsync extends NettyClient {
             long now = System.currentTimeMillis();
             long delt = now - begin;
             if (oneSecond.check() == true) {
-                LOG.warn(
-                        "Target server  {} is unavailable, pending {}, bufferSize {}, block sending {}ms",
-                        name, pendings.get(), cachedSize, delt);
+                LOG.warn("Target server  {} is unavailable, pending {}, bufferSize {}, block sending {}ms", name, pendings.get(), cachedSize, delt);
             }
 
             if (timeoutIntervalCheck.check() == true) {
                 if (messageBatchRef.get() != null) {
-                    LOG.warn(
-                            "Target server  {} is unavailable, wait too much time, throw timeout message",
-                            name);
+                    LOG.warn("Target server  {} is unavailable, wait too much time, throw timeout message", name);
                     messageBatchRef.set(null);
                 }
                 setChannel(null);
                 LOG.warn("Reset channel as null");
-                
+
                 if (blockSend == false) {
                     reconnect();
                     break;
@@ -184,12 +171,10 @@ class NettyClientAsync extends NettyClient {
             JStormUtils.sleepMs(sleepMs);
 
             if (delt > 2 * timeoutMs * 1000L && changeThreadhold == false) {
-                if (channelRef.get() != null
-                        && BATCH_THREASHOLD_WARN >= 2 * messageBatchSize) {
+                if (channelRef.get() != null && BATCH_THREASHOLD_WARN >= 2 * messageBatchSize) {
                     // it is just channel isn't writable;
                     BATCH_THREASHOLD_WARN = BATCH_THREASHOLD_WARN / 2;
-                    LOG.info("Reduce BATCH_THREASHOLD_WARN to {}",
-                            BATCH_THREASHOLD_WARN);
+                    LOG.info("Reduce BATCH_THREASHOLD_WARN to {}", BATCH_THREASHOLD_WARN);
 
                     changeThreadhold = true;
                 }
@@ -296,12 +281,9 @@ class NettyClientAsync extends NettyClient {
         } else {
             if (messageBatchRef.compareAndSet(null, messageBatch)) {
                 flush_later.set(true);
-            }
-            else
+            } else
                 LOG.error("MessageBatch will be lost. This should not happen.");
         }
-
-        return;
     }
 
     void flush() {
@@ -339,12 +321,10 @@ class NettyClientAsync extends NettyClient {
     @Override
     public void handleResponse() {
         // do nothing
-        return;
     }
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientSync.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientSync.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientSync.java
index c239dd1..2f08957 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientSync.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyClientSync.java
@@ -17,43 +17,35 @@
  */
 package com.alibaba.jstorm.message.netty;
 
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.Config;
 import backtype.storm.messaging.TaskMessage;
 import backtype.storm.utils.DisruptorQueue;
 import backtype.storm.utils.Utils;
-
+import com.alibaba.jstorm.common.metric.AsmGauge;
 import com.alibaba.jstorm.common.metric.QueueGauge;
-import com.alibaba.jstorm.metric.JStormHealthCheck;
-import com.alibaba.jstorm.metric.JStormMetrics;
-import com.alibaba.jstorm.metric.MetricDef;
+import com.alibaba.jstorm.metric.*;
 import com.alibaba.jstorm.utils.JStormServerUtils;
 import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.TimeUtils;
 import com.codahale.metrics.Gauge;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.dsl.ProducerType;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
 
 class NettyClientSync extends NettyClient implements EventHandler {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(NettyClientSync.class);
+    private static final Logger LOG = LoggerFactory.getLogger(NettyClientSync.class);
 
     private ConcurrentLinkedQueue<MessageBatch> batchQueue;
     private DisruptorQueue disruptorQueue;
@@ -63,20 +55,14 @@ class NettyClientSync extends NettyClient implements EventHandler {
     private AtomicLong emitTs = new AtomicLong(0);
 
     @SuppressWarnings("rawtypes")
-    NettyClientSync(Map storm_conf, ChannelFactory factory,
-            ScheduledExecutorService scheduler, String host, int port,
-            ReconnectRunnable reconnector) {
+    NettyClientSync(Map storm_conf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port, ReconnectRunnable reconnector) {
         super(storm_conf, factory, scheduler, host, port, reconnector);
 
         batchQueue = new ConcurrentLinkedQueue<MessageBatch>();
 
-        WaitStrategy waitStrategy =
-                (WaitStrategy) JStormUtils
-                        .createDisruptorWaitStrategy(storm_conf);
+        WaitStrategy waitStrategy = (WaitStrategy) Utils.newInstance((String) storm_conf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY));
 
-        disruptorQueue =
-                DisruptorQueue.mkInstance(name, ProducerType.MULTI,
-                        MAX_SEND_PENDING * 8, waitStrategy);
+        disruptorQueue = DisruptorQueue.mkInstance(name, ProducerType.MULTI, MAX_SEND_PENDING * 8, waitStrategy);
         disruptorQueue.consumerStarted();
 
         if (connectMyself == false) {
@@ -93,21 +79,14 @@ class NettyClientSync extends NettyClient implements EventHandler {
         scheduler.scheduleAtFixedRate(trigger, 10, 1, TimeUnit.SECONDS);
 
         /**
-         * In sync mode, it can't directly use common factory, it will occur
-         * problem when client close and restart
+         * In sync mode, it can't directly use common factory, it will occur problem when client close and restart
          */
-        ThreadFactory bossFactory =
-                new NettyRenameThreadFactory(MetricDef.NETTY_CLI
-                        + JStormServerUtils.getName(host, port) + "-boss");
+        ThreadFactory bossFactory = new NettyRenameThreadFactory(MetricDef.NETTY_CLI + JStormServerUtils.getName(host, port) + "-boss");
         bossExecutor = Executors.newCachedThreadPool(bossFactory);
-        ThreadFactory workerFactory =
-                new NettyRenameThreadFactory(MetricDef.NETTY_CLI
-                        + JStormServerUtils.getName(host, port) + "-worker");
+        ThreadFactory workerFactory = new NettyRenameThreadFactory(MetricDef.NETTY_CLI + JStormServerUtils.getName(host, port) + "-worker");
         workerExecutor = Executors.newCachedThreadPool(workerFactory);
 
-        clientChannelFactory =
-                new NioClientSocketChannelFactory(bossExecutor, workerExecutor,
-                        1);
+        clientChannelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor, 1);
 
         start();
 
@@ -115,24 +94,24 @@ class NettyClientSync extends NettyClient implements EventHandler {
     }
 
     public void registerSyncMetrics() {
-        JStormMetrics.registerWorkerGauge(new Gauge<Double>() {
-            @Override
-            public Double getValue() {
-                return Double.valueOf(batchQueue.size());
-            }
-        }, MetricDef.NETTY_CLI_SYNC_BATCH_QUEUE, nettyConnection.toString());
-
-        QueueGauge cacheQueueGauge =
-                new QueueGauge(MetricDef.NETTY_CLI_SYNC_DISR_QUEUE
-                        + nettyConnection.toString(), disruptorQueue);
-
-        JStormMetrics
-                .registerWorkerGauge(cacheQueueGauge,
-                        MetricDef.NETTY_CLI_SYNC_DISR_QUEUE,
-                        nettyConnection.toString());
-        JStormHealthCheck.registerWorkerHealthCheck(
-                MetricDef.NETTY_CLI_SYNC_DISR_QUEUE + ":"
-                        + nettyConnection.toString(), cacheQueueGauge);
+        if (enableNettyMetrics) {
+            JStormMetrics.registerNettyMetric(MetricUtils
+                            .nettyMetricName(MetricDef.NETTY_CLI_SYNC_BATCH_QUEUE + nettyConnection.toString(), MetricType.GAUGE),
+                    new AsmGauge(new Gauge<Double>() {
+                        @Override
+                        public Double getValue() {
+                            return (double) batchQueue.size();
+                        }
+                    }));
+
+            QueueGauge cacheQueueGauge = new QueueGauge(disruptorQueue, MetricDef.NETTY_CLI_SYNC_DISR_QUEUE, nettyConnection.toString());
+
+            JStormMetrics.registerNettyMetric(MetricUtils
+                            .nettyMetricName(MetricDef.NETTY_CLI_SYNC_DISR_QUEUE + nettyConnection.toString(), MetricType.GAUGE),
+                    new AsmGauge(cacheQueueGauge));
+            JStormHealthCheck.registerWorkerHealthCheck(
+                    MetricDef.NETTY_CLI_SYNC_DISR_QUEUE + ":" + nettyConnection.toString(), cacheQueueGauge);
+        }
     }
 
     /**
@@ -166,8 +145,6 @@ class NettyClientSync extends NettyClient implements EventHandler {
 
     /**
      * Don't take care of competition
-     * 
-     * @param blocked
      */
     public void sendData() {
         long start = System.nanoTime();
@@ -188,12 +165,13 @@ class NettyClientSync extends NettyClient implements EventHandler {
             JStormUtils.halt_process(-1, err);
         } finally {
             long end = System.nanoTime();
-            sendTimer.update((end - start) / 1000000.0d);
+            if (sendTimer != null) {
+                sendTimer.update((end - start) / TimeUtils.NS_PER_US);
+            }
         }
     }
 
     public void sendAllData() {
-
         long start = System.nanoTime();
         try {
             disruptorQueue.consumeBatch(this);
@@ -216,7 +194,9 @@ class NettyClientSync extends NettyClient implements EventHandler {
             JStormUtils.halt_process(-1, err);
         } finally {
             long end = System.nanoTime();
-            sendTimer.update((end - start) / 1000000.0d);
+            if (sendTimer != null) {
+                sendTimer.update((end - start) / TimeUtils.NS_PER_US);
+            }
         }
     }
 
@@ -227,8 +207,7 @@ class NettyClientSync extends NettyClient implements EventHandler {
     }
 
     @Override
-    public void onEvent(Object event, long sequence, boolean endOfBatch)
-            throws Exception {
+    public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
         if (event == null) {
             return;
         }
@@ -296,22 +275,19 @@ class NettyClientSync extends NettyClient implements EventHandler {
     }
 
     public void unregisterSyncMetrics() {
-        JStormMetrics.unregisterWorkerMetric(
-                MetricDef.NETTY_CLI_SYNC_BATCH_QUEUE,
-                nettyConnection.toString());
-        JStormMetrics
-                .unregisterWorkerMetric(MetricDef.NETTY_CLI_SYNC_DISR_QUEUE,
-                        nettyConnection.toString());
-        JStormHealthCheck
-                .unregisterWorkerHealthCheck(MetricDef.NETTY_CLI_SYNC_DISR_QUEUE
-                        + ":" + nettyConnection.toString());
+        if (enableNettyMetrics) {
+            JStormMetrics.unregisterNettyMetric(MetricUtils
+                    .nettyMetricName(MetricDef.NETTY_CLI_SYNC_BATCH_QUEUE + nettyConnection.toString(), MetricType.GAUGE));
+            JStormMetrics.unregisterNettyMetric(MetricUtils
+                    .nettyMetricName(MetricDef.NETTY_CLI_SYNC_DISR_QUEUE + nettyConnection.toString(), MetricType.GAUGE));
+            JStormHealthCheck
+                    .unregisterWorkerHealthCheck(MetricDef.NETTY_CLI_SYNC_DISR_QUEUE + ":" + nettyConnection.toString());
+        }
     }
 
     @Override
     public void close() {
-        LOG.info(
-                "Begin to close connection to {} and flush all data, batchQueue {}, disruptor {}",
-                name, batchQueue.size(), disruptorQueue.population());
+        LOG.info("Begin to close connection to {} and flush all data, batchQueue {}, disruptor {}", name, batchQueue.size(), disruptorQueue.population());
         sendAllData();
         disruptorQueue.haltWithInterrupt();
         if (connectMyself == false) {
@@ -326,7 +302,6 @@ class NettyClientSync extends NettyClient implements EventHandler {
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyConnection.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyConnection.java
index cd8c0fa..4f2358f 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyConnection.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyConnection.java
@@ -21,23 +21,23 @@ import java.io.Serializable;
 
 import com.alibaba.jstorm.utils.NetWorkUtils;
 
-public class NettyConnection implements Serializable{
+public class NettyConnection implements Serializable {
     protected String clientPort;
     protected String serverPort;
-    
+
     public String getClientPort() {
         return clientPort;
     }
-    
+
     public void setClientPort(String client, int port) {
         String ip = NetWorkUtils.host2Ip(client);
         clientPort = ip + ":" + port;
     }
-    
+
     public String getServerPort() {
         return serverPort;
     }
-    
+
     public void setServerPort(String server, int port) {
         String ip = NetWorkUtils.host2Ip(server);
         serverPort = ip + ":" + port;
@@ -47,12 +47,8 @@ public class NettyConnection implements Serializable{
     public int hashCode() {
         final int prime = 31;
         int result = 1;
-        result =
-                prime * result
-                        + ((clientPort == null) ? 0 : clientPort.hashCode());
-        result =
-                prime * result
-                        + ((serverPort == null) ? 0 : serverPort.hashCode());
+        result = prime * result + ((clientPort == null) ? 0 : clientPort.hashCode());
+        result = prime * result + ((serverPort == null) ? 0 : serverPort.hashCode());
         return result;
     }
 
@@ -77,15 +73,14 @@ public class NettyConnection implements Serializable{
             return false;
         return true;
     }
-    
+
     @Override
     public String toString() {
-        return clientPort  + "->" + serverPort;
+        return clientPort + "->" + serverPort;
     }
-    
-    public static String mkString(String client, int clientPort, 
-            String server, int serverPort) {
+
+    public static String mkString(String client, int clientPort, String server, int serverPort) {
         return client + ":" + clientPort + "->" + server + ":" + serverPort;
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyContext.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyContext.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyContext.java
index a6ddd9a..1a090f2 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyContext.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyContext.java
@@ -40,8 +40,7 @@ import com.alibaba.jstorm.metric.MetricDef;
 import com.alibaba.jstorm.utils.JStormUtils;
 
 public class NettyContext implements IContext {
-    private final static Logger LOG = LoggerFactory
-            .getLogger(NettyContext.class);
+    private final static Logger LOG = LoggerFactory.getLogger(NettyContext.class);
     @SuppressWarnings("rawtypes")
     private Map storm_conf;
 
@@ -65,36 +64,19 @@ public class NettyContext implements IContext {
     public void prepare(Map storm_conf) {
         this.storm_conf = storm_conf;
 
-        int maxWorkers =
-                Utils.getInt(storm_conf
-                        .get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
-        ThreadFactory bossFactory =
-                new NettyRenameThreadFactory(MetricDef.NETTY_CLI + "boss");
-        ThreadFactory workerFactory =
-                new NettyRenameThreadFactory(MetricDef.NETTY_CLI + "worker");
+        int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
+        ThreadFactory bossFactory = new NettyRenameThreadFactory(MetricDef.NETTY_CLI + "boss");
+        ThreadFactory workerFactory = new NettyRenameThreadFactory(MetricDef.NETTY_CLI + "worker");
 
         if (maxWorkers > 0) {
             clientChannelFactory =
-                    new NioClientSocketChannelFactory(
-                            Executors.newCachedThreadPool(bossFactory),
-                            Executors.newCachedThreadPool(workerFactory),
-                            maxWorkers);
+                    new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory), maxWorkers);
         } else {
-            clientChannelFactory =
-                    new NioClientSocketChannelFactory(
-                            Executors.newCachedThreadPool(bossFactory),
-                            Executors.newCachedThreadPool(workerFactory));
+            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory));
         }
-        int otherWorkers =
-                Utils.getInt(storm_conf.get(Config.TOPOLOGY_WORKERS), 1) - 1;
-        int poolSize =
-                Math.min(Math.max(1, otherWorkers),
-                        MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE);
-        clientScheduleService =
-                Executors
-                        .newScheduledThreadPool(poolSize,
-                                new NettyRenameThreadFactory(
-                                        "client-schedule-service"));
+        int otherWorkers = Utils.getInt(storm_conf.get(Config.TOPOLOGY_WORKERS), 1) - 1;
+        int poolSize = Math.min(Math.max(1, otherWorkers), MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE);
+        clientScheduleService = Executors.newScheduledThreadPool(poolSize, new NettyRenameThreadFactory("client-schedule-service"));
 
         reconnector = new ReconnectRunnable();
         new AsyncLoopThread(reconnector, true, Thread.MIN_PRIORITY, true);
@@ -119,11 +101,9 @@ public class NettyContext implements IContext {
     @Override
     public IConnection connect(String topology_id, String host, int port) {
         if (isSyncMode == true) {
-            return new NettyClientSync(storm_conf, clientChannelFactory,
-                    clientScheduleService, host, port, reconnector);
+            return new NettyClientSync(storm_conf, clientChannelFactory, clientScheduleService, host, port, reconnector);
         } else {
-            return new NettyClientAsync(storm_conf, clientChannelFactory,
-                    clientScheduleService, host, port, reconnector);
+            return new NettyClientAsync(storm_conf, clientChannelFactory, clientScheduleService, host, port, reconnector);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyRenameThreadFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyRenameThreadFactory.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyRenameThreadFactory.java
index 2e060c2..5d38fc5 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyRenameThreadFactory.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyRenameThreadFactory.java
@@ -27,8 +27,7 @@ public class NettyRenameThreadFactory implements ThreadFactory {
 
     static {
         // Rename Netty threads
-        ThreadRenamingRunnable
-                .setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
+        ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
     }
 
     final ThreadGroup group;
@@ -37,15 +36,12 @@ public class NettyRenameThreadFactory implements ThreadFactory {
 
     NettyRenameThreadFactory(String name) {
         SecurityManager s = System.getSecurityManager();
-        group =
-                (s != null) ? s.getThreadGroup() : Thread.currentThread()
-                        .getThreadGroup();
+        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
         this.name = name;
     }
 
     public Thread newThread(Runnable r) {
-        Thread t =
-                new Thread(group, r, name + "-" + index.getAndIncrement(), 0);
+        Thread t = new Thread(group, r, name + "-" + index.getAndIncrement(), 0);
         if (t.isDaemon())
             t.setDaemon(false);
         if (t.getPriority() != Thread.NORM_PRIORITY)

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyServer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyServer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyServer.java
index d00b24f..a5fa859 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyServer.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/NettyServer.java
@@ -44,21 +44,19 @@ import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.client.ConfigExtension;
 
 class NettyServer implements IConnection {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(NettyServer.class);
+    private static final Logger LOG = LoggerFactory.getLogger(NettyServer.class);
     @SuppressWarnings("rawtypes")
     Map storm_conf;
     int port;
 
     // private LinkedBlockingQueue message_queue;
-    volatile ChannelGroup allChannels =
-            new DefaultChannelGroup("jstorm-server");
+    volatile ChannelGroup allChannels = new DefaultChannelGroup("jstorm-server");
     final ChannelFactory factory;
     final ServerBootstrap bootstrap;
 
     // ayncBatch is only one solution, so directly set it as true
     private final boolean isSyncMode;
-    
+
     private ConcurrentHashMap<Integer, DisruptorQueue> deserializeQueues;
 
     @SuppressWarnings("rawtypes")
@@ -69,30 +67,17 @@ class NettyServer implements IConnection {
         this.deserializeQueues = deserializeQueues;
 
         // Configure the server.
-        int buffer_size =
-                Utils.getInt(storm_conf
-                        .get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
-        int maxWorkers =
-                Utils.getInt(storm_conf
-                        .get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS));
+        int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+        int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS));
 
         // asyncBatch = ConfigExtension.isNettyTransferAsyncBatch(storm_conf);
 
-        ThreadFactory bossFactory =
-                new NettyRenameThreadFactory("server" + "-boss");
-        ThreadFactory workerFactory =
-                new NettyRenameThreadFactory("server" + "-worker");
+        ThreadFactory bossFactory = new NettyRenameThreadFactory("server" + "-boss");
+        ThreadFactory workerFactory = new NettyRenameThreadFactory("server" + "-worker");
         if (maxWorkers > 0) {
-            factory =
-                    new NioServerSocketChannelFactory(
-                            Executors.newCachedThreadPool(bossFactory),
-                            Executors.newCachedThreadPool(workerFactory),
-                            maxWorkers);
+            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory), maxWorkers);
         } else {
-            factory =
-                    new NioServerSocketChannelFactory(
-                            Executors.newCachedThreadPool(bossFactory),
-                            Executors.newCachedThreadPool(workerFactory));
+            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory));
         }
 
         bootstrap = new ServerBootstrap(factory);
@@ -108,8 +93,7 @@ class NettyServer implements IConnection {
         Channel channel = bootstrap.bind(new InetSocketAddress(port));
         allChannels.add(channel);
 
-        LOG.info("Successfull bind {}, buffer_size:{}, maxWorkers:{}", port,
-                buffer_size, maxWorkers);
+        LOG.info("Successfull bind {}, buffer_size:{}, maxWorkers:{}", port, buffer_size, maxWorkers);
     }
 
     @Override
@@ -129,8 +113,7 @@ class NettyServer implements IConnection {
 
         DisruptorQueue queue = deserializeQueues.get(task);
         if (queue == null) {
-            LOG.debug("Received invalid message directed at port " + task
-                    + ". Dropping...");
+            LOG.debug("Received invalid message directed at port " + task + ". Dropping...");
             return;
         }
 
@@ -138,8 +121,7 @@ class NettyServer implements IConnection {
     }
 
     /**
-     * fetch a message from message queue synchronously (flags != 1) or
-     * asynchronously (flags==1)
+     * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
      */
     public Object recv(Integer taskId, int flags) {
         try {
@@ -211,14 +193,12 @@ class NettyServer implements IConnection {
 
     @Override
     public void send(List<TaskMessage> messages) {
-        throw new UnsupportedOperationException(
-                "Server connection should not send any messages");
+        throw new UnsupportedOperationException("Server connection should not send any messages");
     }
 
     @Override
     public void send(TaskMessage message) {
-        throw new UnsupportedOperationException(
-                "Server connection should not send any messages");
+        throw new UnsupportedOperationException("Server connection should not send any messages");
     }
 
     @Override
@@ -231,4 +211,8 @@ class NettyServer implements IConnection {
         return isSyncMode;
     }
 
+    @Override
+    public boolean available() {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ReconnectRunnable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ReconnectRunnable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ReconnectRunnable.java
index dcf2a5d..f5ec324 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ReconnectRunnable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ReconnectRunnable.java
@@ -26,11 +26,9 @@ import org.slf4j.LoggerFactory;
 import com.alibaba.jstorm.callback.RunnableCallback;
 
 public class ReconnectRunnable extends RunnableCallback {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(ReconnectRunnable.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ReconnectRunnable.class);
 
-    private BlockingQueue<NettyClient> queue =
-            new LinkedBlockingDeque<NettyClient>();
+    private BlockingQueue<NettyClient> queue = new LinkedBlockingDeque<NettyClient>();
 
     public void pushEvent(NettyClient client) {
         queue.offer(client);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientHandler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientHandler.java
index f84c2f0..7b511e9 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientHandler.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientHandler.java
@@ -30,8 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StormClientHandler extends SimpleChannelUpstreamHandler {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(StormClientHandler.class);
+    private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
     private NettyClient client;
     private AtomicBoolean being_closed;
 
@@ -41,16 +40,25 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler {
     }
 
     /**
-     * Sometime when connect one bad channel which isn't writable, it will call
-     * this function
+     * @@@ Comment this function
+     * 
+     * Don't allow call from low netty layer, whose call will try to obtain the lock of jstorm netty layer
+     * otherwise it will lead to deadlock 
+     */
+//    @Override
+//    public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+//        
+//    	client.notifyInterestChanged(e.getChannel());
+//    }
+
+    /**
+     * Sometime when connect one bad channel which isn't writable, it will call this function
      */
     @Override
-    public void channelConnected(ChannelHandlerContext ctx,
-            ChannelStateEvent event) {
+    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) {
         // register the newly established channel
         Channel channel = event.getChannel();
-        LOG.info("connection established to :{}, local port:{}",
-                client.getRemoteAddr(), channel.getLocalAddress());
+        LOG.info("connection established to :{}, local port:{}", client.getRemoteAddr(), channel.getLocalAddress());
 
         client.handleResponse();
     }
@@ -63,8 +71,8 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler {
 
     /**
      * 
-     * @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#exceptionCaught(org.jboss.netty.channel.ChannelHandlerContext,
-     *      org.jboss.netty.channel.ExceptionEvent)
+     * @see SimpleChannelUpstreamHandler#exceptionCaught(ChannelHandlerContext,
+     *      ExceptionEvent)
      */
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
@@ -82,14 +90,12 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler {
     /**
      * Attention please,
      * 
-     * @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#channelDisconnected(org.jboss.netty.channel.ChannelHandlerContext,
-     *      org.jboss.netty.channel.ChannelStateEvent)
+     * @see SimpleChannelUpstreamHandler#channelDisconnected(ChannelHandlerContext,
+     *      ChannelStateEvent)
      */
     @Override
-    public void channelDisconnected(ChannelHandlerContext ctx,
-            ChannelStateEvent e) throws Exception {
-        LOG.info("Receive channelDisconnected to {}, channel = {}",
-                client.getRemoteAddr(), e.getChannel());
+    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+        LOG.info("Receive channelDisconnected to {}, channel = {}", client.getRemoteAddr(), e.getChannel());
         // ctx.sendUpstream(e);
         super.channelDisconnected(ctx, e);
 
@@ -97,10 +103,8 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler {
     }
 
     @Override
-    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
-            throws Exception {
-        LOG.info("Connection to {} has been closed, channel = {}",
-                client.getRemoteAddr(), e.getChannel());
+    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+        LOG.info("Connection to {} has been closed, channel = {}", client.getRemoteAddr(), e.getChannel());
         super.channelClosed(ctx, e);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientPipelineFactory.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientPipelineFactory.java
index 080f91c..8927809 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientPipelineFactory.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormClientPipelineFactory.java
@@ -27,12 +27,12 @@ import com.alibaba.jstorm.client.ConfigExtension;
 
 class StormClientPipelineFactory implements ChannelPipelineFactory {
     private NettyClient client;
-    private Map         conf;
+    private Map conf;
 
     StormClientPipelineFactory(NettyClient client, Map conf) {
         this.client = client;
         this.conf = conf;
-        
+
     }
 
     public ChannelPipeline getPipeline() throws Exception {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerHandler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerHandler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerHandler.java
index 916ce93..c1b9cf2 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerHandler.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerHandler.java
@@ -33,8 +33,7 @@ import org.slf4j.LoggerFactory;
 import backtype.storm.messaging.TaskMessage;
 
 class StormServerHandler extends SimpleChannelUpstreamHandler {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(StormServerHandler.class);
+    private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class);
     private NettyServer server;
     private Map<Channel, Integer> failureCounters;
 
@@ -71,29 +70,28 @@ class StormServerHandler extends SimpleChannelUpstreamHandler {
         LOG.info("Connection established {}", e.getChannel().getRemoteAddress());
         server.addChannel(e.getChannel());
     }
-    
+
     @Override
-    public void childChannelClosed(
-            ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception {
+    public void childChannelClosed(ChannelHandlerContext ctx, ChildChannelStateEvent e) throws Exception {
         super.childChannelClosed(ctx, e);
         LOG.info("Connection closed {}", e.getChildChannel().getRemoteAddress());
-        
+
         MessageDecoder.removeTransmitHistogram(e.getChildChannel());
     }
-    
+
     @Override
     public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
         super.channelDisconnected(ctx, e);
         LOG.info("Connection channelDisconnected {}", e.getChannel().getRemoteAddress());
-        
+
         MessageDecoder.removeTransmitHistogram(e.getChannel());
     };
-    
+
     @Override
     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
         super.channelClosed(ctx, e);
         LOG.info("Connection channelClosed {}", e.getChannel().getRemoteAddress());
-        
+
         MessageDecoder.removeTransmitHistogram(e.getChannel());
     };
 
@@ -131,8 +129,7 @@ class StormServerHandler extends SimpleChannelUpstreamHandler {
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
         // removeFailureCounter(e.getChannel());
         if (e.getChannel() != null) {
-            LOG.info("Channel occur exception {}", e.getChannel()
-                    .getRemoteAddress());
+            LOG.info("Channel occur exception {}", e.getChannel().getRemoteAddress());
         }
 
         server.closeChannel(e.getChannel());

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerPipelineFactory.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerPipelineFactory.java
index 9dead91..6489d4f 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerPipelineFactory.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/StormServerPipelineFactory.java
@@ -26,7 +26,7 @@ import org.jboss.netty.channel.Channels;
 class StormServerPipelineFactory implements ChannelPipelineFactory {
     private NettyServer server;
     private Map conf;
-    
+
     StormServerPipelineFactory(NettyServer server, Map conf) {
         this.server = server;
         this.conf = conf;

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AlimonitorClient.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AlimonitorClient.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AlimonitorClient.java
deleted file mode 100755
index 760e538..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AlimonitorClient.java
+++ /dev/null
@@ -1,267 +0,0 @@
-package com.alibaba.jstorm.metric;
-
-import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.http.HttpEntity;
-import org.apache.http.NameValuePair;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.message.BasicNameValuePair;
-import org.apache.http.util.EntityUtils;
-import org.apache.log4j.Logger;
-
-import backtype.storm.utils.Utils;
-
-public class AlimonitorClient extends MetricSendClient {
-
-    public static Logger LOG = Logger.getLogger(AlimonitorClient.class);
-
-    // Send to localhost:15776 by default
-    public static final String DEFAUT_ADDR = "127.0.0.1";
-    public static final String DEFAULT_PORT = "15776";
-    public static final int DEFAUTL_FLAG = 0;
-    public static final String DEFAULT_ERROR_INFO = "";
-
-    private final String COLLECTION_FLAG = "collection_flag";
-    private final String ERROR_INFO = "error_info";
-    private final String MSG = "MSG";
-
-    private String port;
-    private String requestIP;
-    private String monitorName;
-    private int collectionFlag;
-    private String errorInfo;
-
-    private boolean post;
-
-    public AlimonitorClient() {
-    }
-
-    public AlimonitorClient(String requestIP, String port, boolean post) {
-        this.requestIP = requestIP;
-        this.port = port;
-        this.post = post;
-        this.monitorName = null;
-        this.collectionFlag = 0;
-        this.errorInfo = null;
-    }
-
-    public void setIpAddr(String ipAddr) {
-        this.requestIP = ipAddr;
-    }
-
-    public void setPort(String port) {
-        this.port = port;
-    }
-
-    public void setMonitorName(String monitorName) {
-        this.monitorName = monitorName;
-    }
-
-    public void setCollectionFlag(int flag) {
-        this.collectionFlag = flag;
-    }
-
-    public void setErrorInfo(String msg) {
-        this.errorInfo = msg;
-    }
-
-    public void setPostFlag(boolean post) {
-        this.post = post;
-    }
-
-    public String buildURL() {
-        return "http://" + requestIP + ":" + port + "/passive";
-    }
-
-    public String buildRqstAddr() {
-        return "http://" + requestIP + ":" + port + "/passive?name="
-                + monitorName + "&msg=";
-    }
-
-    @Override
-    public boolean send(Map<String, Object> msg) {
-        try {
-            if (monitorName == null) {
-                LOG.warn("monitor name is null");
-                return false;
-            }
-            return sendRequest(collectionFlag, errorInfo, msg);
-        } catch (Exception e) {
-            LOG.error("Failed to sendRequest", e);
-            return false;
-        }
-    }
-
-    @Override
-    public boolean send(List<Map<String, Object>> msg) {
-        try {
-            if (monitorName == null) {
-                LOG.warn("monitor name is null");
-                return false;
-            }
-            return sendRequest(collectionFlag, errorInfo, msg);
-        } catch (Exception e) {
-            LOG.error("Failed to sendRequest", e);
-            return false;
-        }
-    }
-
-    public Map buildAliMonitorMsg(int collection_flag, String error_message) {
-        // Json format of the message sent to Alimonitor
-        // {
-        // "collection_flag":int,
-        // "error_info":string,
-        // "MSG": ojbect | array
-        // }
-        Map ret = new HashMap();
-        ret.put(COLLECTION_FLAG, collection_flag);
-        ret.put(ERROR_INFO, error_message);
-        ret.put(MSG, null);
-
-        return ret;
-    }
-
-    private void addMsgData(Map jsonObj, Map<String, Object> map) {
-        jsonObj.put(MSG, map);
-    }
-
-    private void addMsgData(Map jsonObj, List<Map<String, Object>> mapList) {
-        // JSONArray jsonArray = new JSONArray();
-        // for(Map<String, Object> map : mapList) {
-        // jsonArray.add(map);
-        // }
-
-        jsonObj.put(MSG, mapList);
-    }
-
-    private boolean sendRequest(int collection_flag, String error_message,
-            Map<String, Object> msg) throws Exception {
-        boolean ret = false;
-
-        if (msg.size() == 0)
-            return ret;
-
-        Map jsonObj = buildAliMonitorMsg(collection_flag, error_message);
-        addMsgData(jsonObj, msg);
-        String jsonMsg = jsonObj.toString();
-        LOG.info(jsonMsg);
-
-        if (post == true) {
-            String url = buildURL();
-            ret = httpPost(url, jsonMsg);
-        } else {
-            String request = buildRqstAddr();
-            StringBuilder postAddr = new StringBuilder();
-            postAddr.append(request);
-            postAddr.append(URLEncoder.encode(jsonMsg));
-
-            ret = httpGet(postAddr);
-        }
-
-        return ret;
-    }
-
-    private boolean sendRequest(int collection_flag, String error_message,
-            List<Map<String, Object>> msgList) throws Exception {
-        boolean ret = false;
-
-        if (msgList.size() == 0)
-            return ret;
-
-        Map jsonObj = buildAliMonitorMsg(collection_flag, error_message);
-        addMsgData(jsonObj, msgList);
-
-        String jsonMsg = Utils.to_json(jsonObj);
-        LOG.info(jsonMsg);
-
-        if (post == true) {
-            String url = buildURL();
-            ret = httpPost(url, jsonMsg);
-        } else {
-            String request = buildRqstAddr();
-            StringBuilder postAddr = new StringBuilder();
-            postAddr.append(request);
-            postAddr.append(URLEncoder.encode(jsonMsg));
-
-            ret = httpGet(postAddr);
-        }
-
-        return ret;
-    }
-
-    private boolean httpGet(StringBuilder postAddr) {
-        boolean ret = false;
-
-        CloseableHttpClient httpClient = HttpClientBuilder.create().build();
-        CloseableHttpResponse response = null;
-
-        try {
-            HttpGet request = new HttpGet(postAddr.toString());
-            response = httpClient.execute(request);
-            HttpEntity entity = response.getEntity();
-            if (entity != null) {
-                LOG.info(EntityUtils.toString(entity));
-            }
-            EntityUtils.consume(entity);
-            ret = true;
-        } catch (Exception e) {
-            LOG.error("Exception when sending http request to alimonitor", e);
-        } finally {
-            try {
-                if (response != null)
-                    response.close();
-                httpClient.close();
-            } catch (Exception e) {
-                LOG.error("Exception when closing httpclient", e);
-            }
-        }
-
-        return ret;
-    }
-
-    private boolean httpPost(String url, String msg) {
-        boolean ret = false;
-
-        CloseableHttpClient httpClient = HttpClientBuilder.create().build();
-        CloseableHttpResponse response = null;
-
-        try {
-            HttpPost request = new HttpPost(url);
-            List<NameValuePair> nvps = new ArrayList<NameValuePair>();
-            nvps.add(new BasicNameValuePair("name", monitorName));
-            nvps.add(new BasicNameValuePair("msg", msg));
-            request.setEntity(new UrlEncodedFormEntity(nvps));
-            response = httpClient.execute(request);
-            HttpEntity entity = response.getEntity();
-            if (entity != null) {
-                LOG.info(EntityUtils.toString(entity));
-            }
-            EntityUtils.consume(entity);
-            ret = true;
-        } catch (Exception e) {
-            LOG.error("Exception when sending http request to alimonitor", e);
-        } finally {
-            try {
-                if (response != null)
-                    response.close();
-                httpClient.close();
-            } catch (Exception e) {
-                LOG.error("Exception when closing httpclient", e);
-            }
-        }
-
-        return ret;
-    }
-
-    public void close() {
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricFilter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricFilter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricFilter.java
new file mode 100644
index 0000000..4313b6f
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricFilter.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.metric;
+
+import com.alibaba.jstorm.common.metric.AsmMetric;
+
+import java.io.Serializable;
+
+/**
+ * @author Cody (weiyue.wy@alibaba-inc.com)
+ * @since 2.0.5
+ */
+public interface AsmMetricFilter extends Serializable {
+    /**
+     * Matches all metrics, regardless of type or name.
+     */
+    AsmMetricFilter ALL = new AsmMetricFilter() {
+        private static final long serialVersionUID = 7089987006352295530L;
+
+        @Override
+        public boolean matches(String name, AsmMetric metric) {
+            return true;
+        }
+    };
+
+    /**
+     * Returns {@code true} if the metric matches the filter; {@code false} otherwise.
+     * 
+     * @param name the metric node
+     * @param metric the metric
+     * @return {@code true} if the metric matches the filter
+     */
+    boolean matches(String name, AsmMetric metric);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricRegistry.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricRegistry.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricRegistry.java
new file mode 100644
index 0000000..710da9d
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricRegistry.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.metric;
+
+import com.alibaba.jstorm.common.metric.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * metric registry. generally methods of this class should not be exposed, wrapper methods in @see JStormMonitorMetrics should be called.
+ *
+ * @author Cody (weiyue.wy@alibaba-inc.com)
+ * @since 2.0.5
+ */
+public class AsmMetricRegistry implements AsmMetricSet {
+    private static final long serialVersionUID = 8184106900230111064L;
+    private static final Logger LOG = LoggerFactory.getLogger(AsmMetricRegistry.class);
+
+    protected final ConcurrentMap<String, AsmMetric> metrics = new ConcurrentHashMap<String, AsmMetric>();
+
+    public int size() {
+        return metrics.size();
+    }
+
+    /**
+     * Given a {@link com.alibaba.jstorm.common.metric.old.window.Metric}, registers it under the given name.
+     *
+     * @param name   the metric node
+     * @param metric the metric
+     * @param <T>    the type of the metric
+     * @return {@code metric}
+     * @throws IllegalArgumentException if the name is already registered
+     */
+    @SuppressWarnings("unchecked")
+    public <T extends AsmMetric> AsmMetric register(String name, T metric) throws IllegalArgumentException {
+        metric.setMetricName(name);
+        final AsmMetric existing = metrics.putIfAbsent(name, metric);
+        if (existing == null) {
+            LOG.info("Successfully register metric of {}", name);
+            return metric;
+        } else {
+            LOG.warn("duplicate metric: {}", name);
+            return existing;
+        }
+    }
+
+    /**
+     * Removes the metric with the given name.
+     *
+     * @param name the metric node
+     * @return whether or not the metric was removed
+     */
+    public boolean remove(String name) {
+        final AsmMetric metric = metrics.remove(name);
+        if (metric != null) {
+            LOG.info("Successfully unregister metric of {}", name);
+            return true;
+        }
+        return false;
+    }
+
+    public AsmMetric getMetric(String name) {
+        return metrics.get(name);
+    }
+
+    /**
+     * Returns a set of the names of all the metrics in the registry.
+     *
+     * @return the names of all the metrics
+     */
+    public SortedSet<String> getMetricNames() {
+        return Collections.unmodifiableSortedSet(new TreeSet<String>(metrics.keySet()));
+    }
+
+    /**
+     * Returns a map of all the gauges in the registry and their names.
+     *
+     * @return all the gauges in the registry
+     */
+    public SortedMap<String, AsmGauge> getGauges() {
+        return getGauges(AsmMetricFilter.ALL);
+    }
+
+    /**
+     * Returns a map of all the gauges in the registry and their names which match the given filter.
+     *
+     * @param filter the metric filter to match
+     * @return all the gauges in the registry
+     */
+    public SortedMap<String, AsmGauge> getGauges(AsmMetricFilter filter) {
+        return getMetrics(AsmGauge.class, filter);
+    }
+
+    /**
+     * Returns a map of all the counters in the registry and their names.
+     *
+     * @return all the counters in the registry
+     */
+    public SortedMap<String, AsmCounter> getCounters() {
+        return getCounters(AsmMetricFilter.ALL);
+    }
+
+    /**
+     * Returns a map of all the counters in the registry and their names which match the given filter.
+     *
+     * @param filter the metric filter to match
+     * @return all the counters in the registry
+     */
+    public SortedMap<String, AsmCounter> getCounters(AsmMetricFilter filter) {
+        return getMetrics(AsmCounter.class, filter);
+    }
+
+    /**
+     * Returns a map of all the histograms in the registry and their names.
+     *
+     * @return all the histograms in the registry
+     */
+    public SortedMap<String, AsmHistogram> getHistograms() {
+        return getHistograms(AsmMetricFilter.ALL);
+    }
+
+    /**
+     * Returns a map of all the histograms in the registry and their names which match the given filter.
+     *
+     * @param filter the metric filter to match
+     * @return all the histograms in the registry
+     */
+    public SortedMap<String, AsmHistogram> getHistograms(AsmMetricFilter filter) {
+        return getMetrics(AsmHistogram.class, filter);
+    }
+
+    /**
+     * Returns a map of all the meters in the registry and their names.
+     *
+     * @return all the meters in the registry
+     */
+    public SortedMap<String, AsmMeter> getMeters() {
+        return getMeters(AsmMetricFilter.ALL);
+    }
+
+    /**
+     * Returns a map of all the meters in the registry and their names which match the given filter.
+     *
+     * @param filter the metric filter to match
+     * @return all the meters in the registry
+     */
+    public SortedMap<String, AsmMeter> getMeters(AsmMetricFilter filter) {
+        return getMetrics(AsmMeter.class, filter);
+    }
+
+    /**
+     * Returns a map of all the timers in the registry and their names.
+     *
+     * @return all the timers in the registry
+     */
+    public SortedMap<String, AsmTimer> getTimers() {
+        return getTimers(AsmMetricFilter.ALL);
+    }
+
+    /**
+     * Returns a map of all the timers in the registry and their names which match the given filter.
+     *
+     * @param filter the metric filter to match
+     * @return all the timers in the registry
+     */
+    public SortedMap<String, AsmTimer> getTimers(AsmMetricFilter filter) {
+        return getMetrics(AsmTimer.class, filter);
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T extends AsmMetric> SortedMap<String, T> getMetrics(Class<T> klass, AsmMetricFilter filter) {
+        final TreeMap<String, T> timers = new TreeMap<String, T>();
+        for (Map.Entry<String, AsmMetric> entry : metrics.entrySet()) {
+            if (klass.isInstance(entry.getValue()) && filter.matches(entry.getKey(), entry.getValue())) {
+                timers.put(entry.getKey(), (T) entry.getValue());
+            }
+        }
+        return timers;
+    }
+
+    @Override
+    public Map<String, AsmMetric> getMetrics() {
+        return metrics;
+    }
+
+}