You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:41 UTC

[03/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutExecutors.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutExecutors.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutExecutors.java
index 05f745c..4bfbf2d 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutExecutors.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutExecutors.java
@@ -17,53 +17,56 @@
  */
 package com.alibaba.jstorm.task.execute.spout;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.Config;
-import backtype.storm.Constants;
-import backtype.storm.spout.ISpoutOutputCollector;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.BatchTuple;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.DisruptorQueue;
-import backtype.storm.utils.WorkerClassLoader;
-
 import com.alibaba.jstorm.callback.AsyncLoopThread;
 import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.common.metric.Histogram;
+import com.alibaba.jstorm.cluster.Common;
+import com.alibaba.jstorm.common.metric.AsmGauge;
+import com.alibaba.jstorm.common.metric.AsmHistogram;
 import com.alibaba.jstorm.common.metric.TimerRatio;
 import com.alibaba.jstorm.daemon.worker.timer.TaskBatchFlushTrigger;
 import com.alibaba.jstorm.daemon.worker.timer.TimerConstants;
 import com.alibaba.jstorm.daemon.worker.timer.TimerTrigger;
 import com.alibaba.jstorm.metric.JStormMetrics;
+import com.alibaba.jstorm.metric.JStormMetricsReporter;
 import com.alibaba.jstorm.metric.MetricDef;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.metric.MetricUtils;
 import com.alibaba.jstorm.task.Task;
-import com.alibaba.jstorm.task.TaskBaseMetric;
-import com.alibaba.jstorm.task.TaskStatus;
-import com.alibaba.jstorm.task.TaskTransfer;
 import com.alibaba.jstorm.task.TaskBatchTransfer;
+import com.alibaba.jstorm.task.TaskTransfer;
 import com.alibaba.jstorm.task.acker.Acker;
 import com.alibaba.jstorm.task.comm.TaskSendTargets;
 import com.alibaba.jstorm.task.comm.TupleInfo;
-import com.alibaba.jstorm.task.error.ITaskReportErr;
 import com.alibaba.jstorm.task.execute.BaseExecutors;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatRunable;
+import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent;
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.RotatingMap;
+import com.alibaba.jstorm.utils.TimeUtils;
+import com.codahale.metrics.Gauge;
 import com.lmax.disruptor.EventHandler;
 
+import backtype.storm.Config;
+import backtype.storm.Constants;
+import backtype.storm.spout.ISpout;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.BatchTuple;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.WorkerClassLoader;
+
 /**
  * spout executor
- * 
+ * <p/>
  * All spout actions will be done here
- * 
+ *
  * @author yannian/Longda
- * 
  */
 public class SpoutExecutors extends BaseExecutors implements EventHandler {
     private static Logger LOG = LoggerFactory.getLogger(SpoutExecutors.class);
@@ -73,123 +76,107 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler {
     protected backtype.storm.spout.ISpout spout;
     protected RotatingMap<Long, TupleInfo> pending;
 
-    protected ISpoutOutputCollector output_collector;
+    protected SpoutOutputCollector outputCollector;
 
-    protected boolean firstTime = true;
+    protected AsmHistogram nextTupleTimer;
+    protected AsmHistogram ackerTimer;
+    protected TimerRatio emptyCpuGauge;
 
-    protected Histogram nextTupleTimer;
-    protected Histogram ackerTimer;
-    protected TimerRatio emptyCpuCounter;
+    private String topologyId;
+    private String componentId;
+    private int taskId;
 
     protected AsyncLoopThread ackerRunnableThread;
 
     protected boolean isSpoutFullSleep;
 
-    public SpoutExecutors(Task task, backtype.storm.spout.ISpout _spout,
-            TaskTransfer _transfer_fn,
-            Map<Integer, DisruptorQueue> innerTaskTransfer, Map _storm_conf,
-            TaskSendTargets sendTargets, TaskStatus taskStatus,
-            TopologyContext topology_context, TopologyContext _user_context,
-            TaskBaseMetric _task_stats, ITaskReportErr _report_error) {
-        super(task, _transfer_fn, _storm_conf, innerTaskTransfer,
-                topology_context, _user_context, _task_stats, taskStatus,
-                _report_error);
+    //, backtype.storm.spout.ISpout _spout, TaskTransfer _transfer_fn, Map<Integer, DisruptorQueue> innerTaskTransfer,
+    //Map _storm_conf, TaskSendTargets sendTargets, TaskStatus taskStatus, TopologyContext topology_context, TopologyContext _user_context,
+    //TaskBaseMetric _task_stats, ITaskReportErr _report_error, JStormMetricsReporter metricReporter
+    public SpoutExecutors(Task task) {
+        super(task);
+
+        this.spout = (ISpout)task.getTaskObj();
 
-        this.spout = _spout;
+        this.max_spout_pending = JStormUtils.parseInt(storm_conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING));
 
-        this.max_spout_pending =
-                JStormUtils.parseInt(storm_conf
-                        .get(Config.TOPOLOGY_MAX_SPOUT_PENDING));
+        this.topologyId = sysTopologyCtx.getTopologyId();
+        this.componentId = sysTopologyCtx.getThisComponentId();
+        this.taskId = task.getTaskId();
 
         this.nextTupleTimer =
-                JStormMetrics.registerTaskHistogram(taskId,
-                        MetricDef.EXECUTE_TIME);
+                (AsmHistogram) JStormMetrics.registerTaskMetric(
+                        MetricUtils.taskMetricName(topologyId, componentId, taskId, MetricDef.EXECUTE_TIME, MetricType.HISTOGRAM), new AsmHistogram());
 
         this.ackerTimer =
-                JStormMetrics.registerTaskHistogram(taskId,
-                        MetricDef.ACKER_TIME);
+                (AsmHistogram) JStormMetrics.registerTaskMetric(
+                        MetricUtils.taskMetricName(topologyId, componentId, taskId, MetricDef.ACKER_TIME, MetricType.HISTOGRAM), new AsmHistogram());
 
-        this.emptyCpuCounter = new TimerRatio();
-        JStormMetrics.registerTaskGauge(emptyCpuCounter, taskId,
-                MetricDef.EMPTY_CPU_RATIO);
+        this.emptyCpuGauge = new TimerRatio();
+        JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(topologyId, componentId, taskId, MetricDef.EMPTY_CPU_RATIO, MetricType.GAUGE),
+                new AsmGauge(emptyCpuGauge));
 
         isSpoutFullSleep = ConfigExtension.isSpoutPendFullSleep(storm_conf);
 
-        if (ConfigExtension.isTaskBatchTuple(storm_conf)) {
-            TaskBatchFlushTrigger batchFlushTrigger =
-                    new TaskBatchFlushTrigger(5, idStr
-                            + Constants.SYSTEM_COMPONENT_ID,
-                            (TaskBatchTransfer) _transfer_fn);
-            batchFlushTrigger.register(TimeUnit.MILLISECONDS);
-        }
-
         LOG.info("isSpoutFullSleep:" + isSpoutFullSleep);
-
-    }
-
-    public void prepare(TaskSendTargets sendTargets, TaskTransfer transferFn,
-            TopologyContext topologyContext) {
-
-        JStormMetrics.registerTaskGauge(
-                new com.codahale.metrics.Gauge<Double>() {
-
+        
+        mkPending();
+        
+        JStormMetrics.registerTaskMetric(
+        		MetricUtils.taskMetricName(topologyId, componentId, taskId, MetricDef.PENDING_MAP, MetricType.GAUGE), new AsmGauge(
+                new Gauge<Double>() {
                     @Override
                     public Double getValue() {
                         return (double) pending.size();
                     }
-
-                }, taskId, MetricDef.PENDING_MAP);
+                }));
 
         // collector, in fact it call send_spout_msg
-        this.output_collector =
-                new SpoutCollector(taskId, spout, task_stats, sendTargets,
-                        storm_conf, transferFn, pending, topologyContext,
-                        exeQueue, report_error);
-
-        try {
-            WorkerClassLoader.switchThreadContext();
-            this.spout.open(storm_conf, userTopologyCtx,
-                    new SpoutOutputCollector(output_collector));
-        } catch (Throwable e) {
-            error = e;
-            LOG.error("spout open error ", e);
-            report_error.report(e);
-        } finally {
-            WorkerClassLoader.restoreThreadContext();
-        }
+        SpoutCollector collector = new SpoutCollector(task, pending, exeQueue);
+        this.outputCollector = new SpoutOutputCollector(collector);
+        taskTransfer.getBackpressureController().setOutputCollector(outputCollector);
+        taskHbTrigger.setSpoutOutputCollector(outputCollector);
 
         LOG.info("Successfully create SpoutExecutors " + idStr);
-
+    }
+    
+    public void mkPending() {
+    	// this function will be override by subclass
+        throw new RuntimeException("Should override this function");
     }
 
-    public void nextTuple() {
-        if (firstTime == true) {
+    @Override
+    public void init() throws Exception {
+        
+        this.spout.open(storm_conf, userTopologyCtx, outputCollector);
 
-            int delayRun = ConfigExtension.getSpoutDelayRunSeconds(storm_conf);
+        LOG.info("Successfully open SpoutExecutors " + idStr);
+        
+		int delayRun = ConfigExtension.getSpoutDelayRunSeconds(storm_conf);
 
-            // wait other bolt is ready
-            JStormUtils.sleepMs(delayRun * 1000);
+		// wait other bolt is ready
+		JStormUtils.sleepMs(delayRun * 1000);
 
-            emptyCpuCounter.init();
+		if (taskStatus.isRun()) {
+			spout.activate();
+		} else {
+			spout.deactivate();
+		}
 
-            if (taskStatus.isRun() == true) {
-                spout.activate();
-            } else {
-                spout.deactivate();
-            }
+		LOG.info(idStr + " is ready ");
 
-            firstTime = false;
-            LOG.info(idStr + " is ready ");
-        }
+    }
 
-        if (taskStatus.isRun() == false) {
+    public void nextTuple() {
+        
+        if (!taskStatus.isRun()) {
             JStormUtils.sleepMs(1);
             return;
         }
 
         // if don't need ack, pending map will be always empty
         if (max_spout_pending == null || pending.size() < max_spout_pending) {
-            emptyCpuCounter.stop();
+            emptyCpuGauge.stop();
 
             long start = System.nanoTime();
             try {
@@ -200,15 +187,13 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler {
                 report_error.report(e);
             } finally {
                 long end = System.nanoTime();
-                nextTupleTimer.update((end - start) / 1000000.0d);
+                nextTupleTimer.update((end - start) / TimeUtils.NS_PER_US);
             }
-
-            return;
         } else {
             if (isSpoutFullSleep) {
                 JStormUtils.sleepMs(1);
             }
-            emptyCpuCounter.start();
+            emptyCpuGauge.start();
             // just return, no sleep
         }
     }
@@ -221,25 +206,23 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler {
 
     /**
      * Handle acker message
-     * 
-     * @see com.lmax.disruptor.EventHandler#onEvent(java.lang.Object, long,
-     *      boolean)
+     *
+     * @see EventHandler#onEvent(Object, long, boolean)
      */
     @Override
-    public void onEvent(Object event, long sequence, boolean endOfBatch)
-            throws Exception {
+    public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
         long start = System.nanoTime();
         try {
-
             if (event == null) {
                 return;
             }
-
             Runnable runnable = null;
             if (event instanceof Tuple) {
+                processControlEvent();
                 runnable = processTupleEvent((Tuple) event);
             } else if (event instanceof BatchTuple) {
                 for (Tuple tuple : ((BatchTuple) event).getTuples()) {
+                    processControlEvent();
                     runnable = processTupleEvent(tuple);
                     if (runnable != null) {
                         runnable.run();
@@ -257,8 +240,7 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler {
                 runnable = (Runnable) event;
             } else {
 
-                LOG.warn("Receive one unknow event-" + event.toString() + " "
-                        + idStr);
+                LOG.warn("Receive one unknow event-" + event.toString() + " " + idStr);
                 return;
             }
 
@@ -272,42 +254,43 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler {
             }
         } finally {
             long end = System.nanoTime();
-            ackerTimer.update((end - start) / 1000000.0d);
+            ackerTimer.update((end - start) / TimeUtils.NS_PER_US);
         }
     }
 
     private Runnable processTupleEvent(Tuple event) {
-        Runnable runnable;
+        Runnable runnable = null;
         Tuple tuple = (Tuple) event;
-        Object id = tuple.getValue(0);
-        Object obj = pending.remove((Long) id);
-
-        if (obj == null) {
-            if (isDebug) {
-                LOG.info("Pending map no entry:" + id);
-            }
-            runnable = null;
+        if (event.getSourceStreamId().equals(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID)) {
+            TopoMasterCtrlEvent ctrlEvent = (TopoMasterCtrlEvent) tuple.getValueByField("ctrlEvent");
+            taskTransfer.getBackpressureController().control(ctrlEvent);
         } else {
-            TupleInfo tupleInfo = (TupleInfo) obj;
+            Object id = tuple.getValue(0);
+            Object obj = pending.remove((Long) id);
 
-            String stream_id = tuple.getSourceStreamId();
+            if (obj == null) {
+                if (isDebug) {
+                    LOG.info("Pending map no entry:" + id);
+                }
+                runnable = null;
+            } else {
+                TupleInfo tupleInfo = (TupleInfo) obj;
 
-            if (stream_id.equals(Acker.ACKER_ACK_STREAM_ID)) {
+                String stream_id = tuple.getSourceStreamId();
 
-                runnable =
-                        new AckSpoutMsg(spout, tupleInfo, task_stats, isDebug);
-            } else if (stream_id.equals(Acker.ACKER_FAIL_STREAM_ID)) {
-                runnable =
-                        new FailSpoutMsg(id, spout, tupleInfo, task_stats,
-                                isDebug);
-            } else {
-                LOG.warn("Receive one unknow source Tuple " + idStr);
-                runnable = null;
+                if (stream_id.equals(Acker.ACKER_ACK_STREAM_ID)) {
+
+                    runnable = new AckSpoutMsg(spout, tuple, tupleInfo, task_stats, isDebug);
+                } else if (stream_id.equals(Acker.ACKER_FAIL_STREAM_ID)) {
+                    runnable = new FailSpoutMsg(id, spout, tupleInfo, task_stats, isDebug);
+                } else {
+                    LOG.warn("Receive one unknow source Tuple " + idStr);
+                    runnable = null;
+                }
             }
-        }
 
-        task_stats.recv_tuple(tuple.getSourceComponent(),
-                tuple.getSourceStreamId());
+            task_stats.recv_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId());
+        }
         return runnable;
     }
 
@@ -317,28 +300,23 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler {
 
     private void processTimerEvent(TimerTrigger.TimerEvent event) {
         switch (event.getOpCode()) {
-        case TimerConstants.ROTATING_MAP: {
-            Map<Long, TupleInfo> timeoutMap = pending.rotate();
-            for (java.util.Map.Entry<Long, TupleInfo> entry : timeoutMap
-                    .entrySet()) {
-                TupleInfo tupleInfo = entry.getValue();
-                FailSpoutMsg fail =
-                        new FailSpoutMsg(entry.getKey(), spout,
-                                (TupleInfo) tupleInfo, task_stats, isDebug);
-                fail.run();
+            case TimerConstants.ROTATING_MAP: {
+                Map<Long, TupleInfo> timeoutMap = pending.rotate();
+                for (Map.Entry<Long, TupleInfo> entry : timeoutMap.entrySet()) {
+                    TupleInfo tupleInfo = entry.getValue();
+                    FailSpoutMsg fail = new FailSpoutMsg(entry.getKey(), spout, (TupleInfo) tupleInfo, task_stats, isDebug);
+                    fail.run();
+                }
+                break;
+            }
+            case TimerConstants.TASK_HEARTBEAT: {
+                taskHbTrigger.setExeThreadHbTime(TimeUtils.current_time_secs());
+                break;
+            }
+            default: {
+                LOG.warn("Receive unsupported timer event, opcode=" + event.getOpCode());
+                break;
             }
-            break;
-        }
-        case TimerConstants.TASK_HEARTBEAT: {
-            Integer taskId = (Integer) event.getMsg();
-            TaskHeartbeatRunable.updateTaskHbStats(taskId, task);
-            break;
-        }
-        default: {
-            LOG.warn("Receive unsupported timer event, opcode="
-                    + event.getOpCode());
-            break;
-        }
         }
     }
 
@@ -349,9 +327,12 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler {
             if (event instanceof TimerTrigger.TimerEvent) {
                 processTimerEvent((TimerTrigger.TimerEvent) event);
             } else {
-                LOG.warn("Received unknown control event, "
-                        + event.getClass().getName());
+                LOG.warn("Received unknown control event, " + event.getClass().getName());
             }
         }
     }
+    
+    public Object getOutputCollector() {
+    	return outputCollector;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutTimeoutCallBack.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutTimeoutCallBack.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutTimeoutCallBack.java
index 968831b..b64bc30 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutTimeoutCallBack.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutTimeoutCallBack.java
@@ -31,8 +31,7 @@ import com.alibaba.jstorm.utils.ExpiredCallback;
 import com.alibaba.jstorm.utils.JStormUtils;
 
 public class SpoutTimeoutCallBack<K, V> implements ExpiredCallback<K, V> {
-    private static Logger LOG = LoggerFactory
-            .getLogger(SpoutTimeoutCallBack.class);
+    private static Logger LOG = LoggerFactory.getLogger(SpoutTimeoutCallBack.class);
 
     private DisruptorQueue disruptorEventQueue;
     private backtype.storm.spout.ISpout spout;
@@ -40,16 +39,12 @@ public class SpoutTimeoutCallBack<K, V> implements ExpiredCallback<K, V> {
     private TaskBaseMetric task_stats;
     private boolean isDebug;
 
-    public SpoutTimeoutCallBack(DisruptorQueue disruptorEventQueue,
-            backtype.storm.spout.ISpout _spout, Map _storm_conf,
-            TaskBaseMetric stat) {
+    public SpoutTimeoutCallBack(DisruptorQueue disruptorEventQueue, backtype.storm.spout.ISpout _spout, Map _storm_conf, TaskBaseMetric stat) {
         this.storm_conf = _storm_conf;
         this.disruptorEventQueue = disruptorEventQueue;
         this.spout = _spout;
         this.task_stats = stat;
-        this.isDebug =
-                JStormUtils.parseBoolean(storm_conf.get(Config.TOPOLOGY_DEBUG),
-                        false);
+        this.isDebug = JStormUtils.parseBoolean(storm_conf.get(Config.TOPOLOGY_DEBUG), false);
     }
 
     /**
@@ -62,9 +57,7 @@ public class SpoutTimeoutCallBack<K, V> implements ExpiredCallback<K, V> {
         }
         try {
             TupleInfo tupleInfo = (TupleInfo) val;
-            FailSpoutMsg fail =
-                    new FailSpoutMsg(key, spout, (TupleInfo) tupleInfo,
-                            task_stats, isDebug);
+            FailSpoutMsg fail = new FailSpoutMsg(key, spout, (TupleInfo) tupleInfo, task_stats, isDebug);
 
             disruptorEventQueue.publish(fail);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkCustomGrouper.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkCustomGrouper.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkCustomGrouper.java
index bb6ad9c..46eefe8 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkCustomGrouper.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkCustomGrouper.java
@@ -34,9 +34,7 @@ public class MkCustomGrouper {
 
     private int myTaskId;
 
-    public MkCustomGrouper(TopologyContext context,
-            CustomStreamGrouping _grouping, GlobalStreamId stream,
-            List<Integer> targetTask, int myTaskId) {
+    public MkCustomGrouper(TopologyContext context, CustomStreamGrouping _grouping, GlobalStreamId stream, List<Integer> targetTask, int myTaskId) {
         this.myTaskId = myTaskId;
         this.grouping = _grouping;
         this.grouping.prepare(context, stream, targetTask);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkFieldsGrouper.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkFieldsGrouper.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkFieldsGrouper.java
index 3bf6518..66f2567 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkFieldsGrouper.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkFieldsGrouper.java
@@ -26,17 +26,16 @@ import com.alibaba.jstorm.utils.JStormUtils;
 
 /**
  * field grouping
- * 
+ *
  * @author yannian
- * 
+ *
  */
 public class MkFieldsGrouper {
     private Fields out_fields;
     private Fields group_fields;
     private List<Integer> out_tasks;
 
-    public MkFieldsGrouper(Fields _out_fields, Fields _group_fields,
-            List<Integer> _out_tasks) {
+    public MkFieldsGrouper(Fields _out_fields, Fields _group_fields, List<Integer> _out_tasks) {
 
         for (Iterator<String> it = _group_fields.iterator(); it.hasNext();) {
             String groupField = it.next();
@@ -52,8 +51,7 @@ public class MkFieldsGrouper {
     }
 
     public List<Integer> grouper(List<Object> values) {
-        int hashcode =
-                this.out_fields.select(this.group_fields, values).hashCode();
+        int hashcode = this.out_fields.select(this.group_fields, values).hashCode();
         int group = Math.abs(hashcode % this.out_tasks.size());
         return JStormUtils.mk_list(out_tasks.get(group));
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkGrouper.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkGrouper.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkGrouper.java
index 5408afd..30d641c 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkGrouper.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkGrouper.java
@@ -17,14 +17,6 @@
  */
 package com.alibaba.jstorm.task.group;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.generated.GlobalStreamId;
 import backtype.storm.generated.Grouping;
 import backtype.storm.generated.JavaObject;
@@ -32,11 +24,17 @@ import backtype.storm.grouping.CustomStreamGrouping;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
-
 import com.alibaba.jstorm.daemon.worker.WorkerData;
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.RandomRange;
 import com.alibaba.jstorm.utils.Thrift;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
 
 /**
  * Grouper, get which task should be send to for one tuple
@@ -66,9 +64,8 @@ public class MkGrouper {
     private MkLocalShuffer local_shuffer_grouper;
     private MkLocalFirst localFirst;
 
-    public MkGrouper(TopologyContext _topology_context, Fields _out_fields,
-            Grouping _thrift_grouping, List<Integer> _outTasks,
-            String streamId, WorkerData workerData) {
+    public MkGrouper(TopologyContext _topology_context, Fields _out_fields, Grouping _thrift_grouping, List<Integer> _outTasks, String streamId,
+            WorkerData workerData) {
         this.topology_context = _topology_context;
         this.out_fields = _out_fields;
         this.thrift_grouping = _thrift_grouping;
@@ -83,8 +80,7 @@ public class MkGrouper {
         this.grouptype = this.parseGroupType(workerData);
 
         String id = _topology_context.getThisTaskId() + ":" + streamId;
-        LOG.info(id + " grouptype is " + grouptype + ", out_tasks is "
-                + out_tasks + ", local_tasks" + local_tasks);
+        LOG.info(id + " grouptype is " + grouptype + ", out_tasks is " + out_tasks + ", local_tasks" + local_tasks);
 
     }
 
@@ -104,12 +100,10 @@ public class MkGrouper {
                 grouperType = GrouperType.global;
             } else {
 
-                List<String> fields_group =
-                        Thrift.fieldGrouping(thrift_grouping);
+                List<String> fields_group = Thrift.fieldGrouping(thrift_grouping);
                 Fields fields = new Fields(fields_group);
 
-                fields_grouper =
-                        new MkFieldsGrouper(out_fields, fields, out_tasks);
+                fields_grouper = new MkFieldsGrouper(out_fields, fields, out_tasks);
 
                 // hashcode by fields
                 grouperType = GrouperType.fields;
@@ -132,29 +126,23 @@ public class MkGrouper {
             int myTaskId = topology_context.getThisTaskId();
             String componentId = topology_context.getComponentId(myTaskId);
             GlobalStreamId stream = new GlobalStreamId(componentId, streamId);
-            custom_grouper =
-                    new MkCustomGrouper(topology_context, g, stream, out_tasks,
-                            myTaskId);
+            custom_grouper = new MkCustomGrouper(topology_context, g, stream, out_tasks, myTaskId);
             grouperType = GrouperType.custom_obj;
         } else if (Grouping._Fields.CUSTOM_SERIALIZED.equals(fields)) {
             // user custom group by serialized Object
             byte[] obj = thrift_grouping.get_custom_serialized();
-            CustomStreamGrouping g =
-                    (CustomStreamGrouping) Utils.javaDeserialize(obj);
+            CustomStreamGrouping g = (CustomStreamGrouping) Utils.javaDeserialize(obj);
             int myTaskId = topology_context.getThisTaskId();
             String componentId = topology_context.getComponentId(myTaskId);
             GlobalStreamId stream = new GlobalStreamId(componentId, streamId);
-            custom_grouper =
-                    new MkCustomGrouper(topology_context, g, stream, out_tasks,
-                            myTaskId);
+            custom_grouper = new MkCustomGrouper(topology_context, g, stream, out_tasks, myTaskId);
             grouperType = GrouperType.custom_serialized;
         } else if (Grouping._Fields.DIRECT.equals(fields)) {
             // directly send to a special task
             grouperType = GrouperType.direct;
         } else if (Grouping._Fields.LOCAL_OR_SHUFFLE.equals(fields)) {
             grouperType = GrouperType.local_or_shuffle;
-            local_shuffer_grouper =
-                    new MkLocalShuffer(local_tasks, out_tasks, workerData);
+            local_shuffer_grouper = new MkLocalShuffer(local_tasks, out_tasks, workerData);
         } else if (Grouping._Fields.LOCAL_FIRST.equals(fields)) {
             grouperType = GrouperType.localFirst;
             localFirst = new MkLocalFirst(local_tasks, out_tasks, workerData);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalFirst.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalFirst.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalFirst.java
index 56f9175..92fa18b 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalFirst.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalFirst.java
@@ -40,8 +40,7 @@ import com.alibaba.jstorm.utils.RandomRange;
  * @version
  */
 public class MkLocalFirst extends Shuffer {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(MkLocalFirst.class);
+    private static final Logger LOG = LoggerFactory.getLogger(MkLocalFirst.class);
 
     private List<Integer> allOutTasks = new ArrayList<Integer>();
     private List<Integer> localOutTasks = new ArrayList<Integer>();
@@ -52,8 +51,7 @@ public class MkLocalFirst extends Shuffer {
     private WorkerData workerData;
     private IntervalCheck intervalCheck;
 
-    public MkLocalFirst(List<Integer> workerTasks, List<Integer> allOutTasks,
-            WorkerData workerData) {
+    public MkLocalFirst(List<Integer> workerTasks, List<Integer> allOutTasks, WorkerData workerData) {
         super(workerData);
 
         intervalCheck = new IntervalCheck();
@@ -74,7 +72,6 @@ public class MkLocalFirst extends Shuffer {
         if (localWorkerOutTasks.size() != 0) {
             isLocalWorkerAvail = true;
             localOutTasks.addAll(localWorkerOutTasks);
-            remoteOutTasks.removeAll(localWorkerOutTasks);
         } else {
             isLocalWorkerAvail = false;
         }
@@ -93,8 +90,7 @@ public class MkLocalFirst extends Shuffer {
         for (i = 0; i < size; i++) {
             Integer taskId = outTasks.get(index);
             boolean taskStatus = workerData.isOutboundTaskActive(taskId);
-            DisruptorQueue exeQueue =
-                    (workerData.getInnerTaskTransfer().get(taskId));
+            DisruptorQueue exeQueue = (workerData.getInnerTaskTransfer().get(taskId));
             float queueLoadRatio = exeQueue != null ? exeQueue.pctFull() : 0;
             if (taskStatus && queueLoadRatio < 1.0)
                 break;
@@ -123,7 +119,6 @@ public class MkLocalFirst extends Shuffer {
         }
         return JStormUtils.mk_list(remoteOutTasks.get(index));
     }
-    
 
     public List<Integer> grouper(List<Object> values) {
         List<Integer> ret;

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalShuffer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalShuffer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalShuffer.java
index 324e1e6..c57d380 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalShuffer.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalShuffer.java
@@ -1,37 +1,28 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package com.alibaba.jstorm.task.group;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
 
 import com.alibaba.jstorm.daemon.worker.WorkerData;
+import com.alibaba.jstorm.utils.IntervalCheck;
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.RandomRange;
 
 public class MkLocalShuffer extends Shuffer {
+    private static final Logger LOG = Logger.getLogger(MkLocalShuffer.class);
 
     private List<Integer> outTasks;
     private RandomRange randomrange;
+    private Set<Integer> lastLocalNodeTasks;
+    private IntervalCheck intervalCheck;
+    private WorkerData workerData;
     private boolean isLocal;
 
     public MkLocalShuffer(List<Integer> workerTasks, List<Integer> allOutTasks,
-            WorkerData workerData) {
+                          WorkerData workerData) {
         super(workerData);
         List<Integer> localOutTasks = new ArrayList<Integer>();
 
@@ -40,6 +31,9 @@ public class MkLocalShuffer extends Shuffer {
                 localOutTasks.add(outTask);
             }
         }
+        this.workerData = workerData;
+        intervalCheck = new IntervalCheck();
+        intervalCheck.setInterval(60);
 
         if (localOutTasks.size() != 0) {
             this.outTasks = localOutTasks;
@@ -47,13 +41,43 @@ public class MkLocalShuffer extends Shuffer {
         } else {
             this.outTasks = new ArrayList<Integer>();
             this.outTasks.addAll(allOutTasks);
+            refreshLocalNodeTasks();
             isLocal = false;
         }
+        randomrange = new RandomRange(outTasks.size());
+    }
+
+    /**
+     * Don't need to take care of multiple thread, One task one thread
+     */
+    private void refreshLocalNodeTasks() {
+        Set<Integer> localNodeTasks = workerData.getLocalNodeTasks();
+
+        if (localNodeTasks == null || localNodeTasks.equals(lastLocalNodeTasks) ) {
+            return;
+        }
+        LOG.info("Old localNodeTasks:" + lastLocalNodeTasks + ", new:"
+                + localNodeTasks);
+        lastLocalNodeTasks = localNodeTasks;
+
+        List<Integer> localNodeOutTasks = new ArrayList<Integer>();
 
+        for (Integer outTask : outTasks) {
+            if (localNodeTasks.contains(outTask)) {
+                localNodeOutTasks.add(outTask);
+            }
+        }
+
+        if (localNodeOutTasks.isEmpty() == false) {
+            this.outTasks = localNodeOutTasks;
+        }
         randomrange = new RandomRange(outTasks.size());
     }
 
     public List<Integer> grouper(List<Object> values) {
+        if (!isLocal && intervalCheck.check()) {
+            refreshLocalNodeTasks();
+        }
         int index = getActiveTask(randomrange, outTasks);
         // If none active tasks were found, still send message to a task
         if (index == -1)

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/Shuffer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/Shuffer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/Shuffer.java
index acad674..3d272bf 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/Shuffer.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/Shuffer.java
@@ -38,8 +38,7 @@ public abstract class Shuffer {
         int i = 0;
 
         for (i = 0; i < size; i++) {
-            if (workerData.isOutboundTaskActive(Integer.valueOf(outTasks
-                    .get(index))))
+            if (workerData.isOutboundTaskActive(Integer.valueOf(outTasks.get(index))))
                 break;
             else
                 index = randomrange.nextInt();

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeat.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeat.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeat.java
deleted file mode 100755
index 532f553..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeat.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.task.heartbeat;
-
-import java.io.Serializable;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-
-/**
- * Task heartbeat, this Object will be updated to ZK timely
- * 
- * @author yannian
- * 
- */
-public class TaskHeartbeat implements Serializable {
-
-    private static final long serialVersionUID = -6369195955255963810L;
-    private Integer timeSecs;
-    private Integer uptimeSecs;
-
-    public TaskHeartbeat(int timeSecs, int uptimeSecs) {
-        this.timeSecs = timeSecs;
-        this.uptimeSecs = uptimeSecs;
-    }
-
-    public int getTimeSecs() {
-        return timeSecs;
-    }
-
-    @Override
-    public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
-    }
-
-    public void setTimeSecs(int timeSecs) {
-        this.timeSecs = timeSecs;
-    }
-
-    public int getUptimeSecs() {
-        return uptimeSecs;
-    }
-
-    public void setUptimeSecs(int uptimeSecs) {
-        this.uptimeSecs = uptimeSecs;
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result =
-                prime * result + ((timeSecs == null) ? 0 : timeSecs.hashCode());
-        result =
-                prime * result
-                        + ((uptimeSecs == null) ? 0 : uptimeSecs.hashCode());
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        TaskHeartbeat other = (TaskHeartbeat) obj;
-        if (timeSecs == null) {
-            if (other.timeSecs != null)
-                return false;
-        } else if (!timeSecs.equals(other.timeSecs))
-            return false;
-        if (uptimeSecs == null) {
-            if (other.uptimeSecs != null)
-                return false;
-        } else if (!uptimeSecs.equals(other.uptimeSecs))
-            return false;
-        return true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatRunable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatRunable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatRunable.java
deleted file mode 100644
index be66911..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatRunable.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.task.heartbeat;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-
-import com.alibaba.jstorm.callback.RunnableCallback;
-import com.alibaba.jstorm.cluster.StormClusterState;
-import com.alibaba.jstorm.daemon.worker.WorkerData;
-import com.alibaba.jstorm.schedule.Assignment.AssignmentType;
-import com.alibaba.jstorm.task.Task;
-import com.alibaba.jstorm.task.UptimeComputer;
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.jstorm.utils.TimeUtils;
-
-/**
- * Task hearbeat
- * 
- * @author yannian
- * 
- */
-public class TaskHeartbeatRunable extends RunnableCallback {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(TaskHeartbeatRunable.class);
-
-    private StormClusterState zkCluster;
-    private String topology_id;
-    private UptimeComputer uptime;
-    private Map storm_conf;
-    private Integer frequence;
-    private Map<Integer, Long> taskAssignTsMap = new HashMap<Integer, Long>();
-
-    private static Map<Integer, TaskStats> taskStatsMap =
-            new HashMap<Integer, TaskStats>();
-    private static LinkedBlockingDeque<Event> eventQueue =
-            new LinkedBlockingDeque<TaskHeartbeatRunable.Event>();
-
-    public static void registerTaskStats(int taskId, TaskStats taskStats) {
-        Event event = new Event(Event.REGISTER_TYPE, taskId, taskStats);
-        eventQueue.offer(event);
-    }
-
-    public static void unregisterTaskStats(int taskId) {
-        Event event = new Event(Event.UNREGISTER_TYPE, taskId, null);
-        eventQueue.offer(event);
-    }
-
-    public static void updateTaskHbStats(int taskId, Task taskData) {
-        Event event = new Event(Event.TASK_HEARTBEAT_TYPE, taskId, taskData);
-        eventQueue.offer(event);
-    }
-
-    public TaskHeartbeatRunable(WorkerData workerData) {
-
-        this.zkCluster = workerData.getZkCluster();
-        this.topology_id = workerData.getTopologyId();
-        this.uptime = new UptimeComputer();
-        this.storm_conf = workerData.getStormConf();
-
-        String key = Config.TASK_HEARTBEAT_FREQUENCY_SECS;
-        Object time = storm_conf.get(key);
-        frequence = JStormUtils.parseInt(time, 10);
-
-    }
-
-    public void handle() throws InterruptedException {
-        Event event = eventQueue.take();
-        while (event != null) {
-            switch (event.getType()) {
-            case Event.TASK_HEARTBEAT_TYPE: {
-                updateTaskHbStats(event);
-                break;
-            }
-            case Event.REGISTER_TYPE: {
-                Event<TaskStats> regEvent = event;
-                taskStatsMap.put(event.getTaskId(), regEvent.getEventValue());
-                taskAssignTsMap.put(event.getTaskId(),
-                        System.currentTimeMillis());
-                break;
-            }
-            case Event.UNREGISTER_TYPE: {
-                taskStatsMap.remove(event.getTaskId());
-                taskAssignTsMap.remove(event.getTaskId());
-                break;
-            }
-            default: {
-                LOG.warn("Unknown event type received:" + event.getType());
-                break;
-            }
-            }
-
-            event = eventQueue.take();
-        }
-    }
-
-    @Override
-    public void run() {
-        try {
-            handle();
-        } catch (InterruptedException e) {
-            LOG.info(e.getMessage());
-        }
-    }
-
-    @Override
-    public Object getResult() {
-        return frequence;
-    }
-
-    public void updateTaskHbStats(Event event) {
-        Integer currtime = TimeUtils.current_time_secs();
-        Event<Task> taskHbEvent = event;
-        int taskId = taskHbEvent.getTaskId();
-        String idStr = " " + topology_id + ":" + taskId + " ";
-
-        try {
-
-            TaskHeartbeat hb = new TaskHeartbeat(currtime, uptime.uptime());
-            zkCluster.task_heartbeat(topology_id, taskId, hb);
-
-            LOG.info("update task hearbeat ts " + currtime + " for" + idStr);
-
-            // Check if assignment is changed. e.g scale-out
-            Task task = taskHbEvent.getEventValue();
-            Long timeStamp = taskAssignTsMap.get(taskId);
-            if (timeStamp != null) {
-                if (timeStamp < task.getWorkerAssignmentTs() && 
-                        task.getWorkerAssignmentType().equals(AssignmentType.Assign)) {
-                    LOG.info("Start to update the task data for task-" + taskId);
-                    task.updateTaskData();
-                    taskAssignTsMap.put(taskId, task.getWorkerAssignmentTs());
-                }
-            }
-        } catch (Exception e) {
-            // TODO Auto-generated catch block
-            String errMsg = "Failed to update heartbeat to ZK " + idStr + "\n";
-            LOG.error(errMsg, e);
-        }
-    }
-
-    private static class Event<T> {
-        public static final int REGISTER_TYPE = 0;
-        public static final int UNREGISTER_TYPE = 1;
-        public static final int TASK_HEARTBEAT_TYPE = 2;
-        private final int type;
-        private final int taskId;
-        private final T value;
-
-        public Event(int type, int taskId, T value) {
-            this.type = type;
-            this.taskId = taskId;
-            this.value = value;
-        }
-
-        public int getType() {
-            return type;
-        }
-
-        public int getTaskId() {
-            return taskId;
-        }
-
-        public T getEventValue() {
-            return value;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatUpdater.java
new file mode 100644
index 0000000..86d72f4
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatUpdater.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.task.heartbeat;
+
+import backtype.storm.generated.TaskHeartbeat;
+import backtype.storm.generated.TopologyTaskHbInfo;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.NimbusClient;
+
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.cluster.StormClusterState;
+import com.alibaba.jstorm.task.UptimeComputer;
+import com.alibaba.jstorm.utils.TimeUtils;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Update the task heartbeat information of topology to Nimbus
+ * 
+ * @author Basti Liu
+ * 
+ */
+public class TaskHeartbeatUpdater{
+    private static final Logger LOG = LoggerFactory
+            .getLogger(TaskHeartbeatUpdater.class);
+
+    private int MAX_NUM_TASK_HB_SEND;
+
+    private String topologyId;
+    private int taskId;
+    
+    private Map conf;
+    private NimbusClient client;
+
+    private Map<Integer, TaskHeartbeat> taskHbMap;
+    private TopologyTaskHbInfo taskHbs;
+
+    private StormClusterState zkCluster;
+    
+    public TaskHeartbeatUpdater(Map conf, String topologyId, int taskId, StormClusterState zkCluster) {
+        this.topologyId = topologyId;
+        this.taskId = taskId;
+
+        this.conf = conf;
+        this.client = NimbusClient.getConfiguredClient(conf);
+
+        this.zkCluster = zkCluster;
+        
+        try {
+            TopologyTaskHbInfo taskHbInfo = zkCluster.topology_heartbeat(topologyId);
+            if (taskHbInfo != null) {
+                LOG.info("Found task heartbeat info left in zk for " + topologyId + ": " + taskHbInfo.toString());
+                this.taskHbs = taskHbInfo;
+                this.taskHbMap = taskHbInfo.get_taskHbs();
+                if (this.taskHbMap == null) {
+                    this.taskHbMap = new ConcurrentHashMap<Integer, TaskHeartbeat>();
+                    taskHbs.set_taskHbs(this.taskHbMap);
+                }
+                this.taskHbs.set_topologyId(topologyId);
+                this.taskHbs.set_topologyMasterId(this.taskId);
+            } else {
+                LOG.info("There is not any previous task heartbeat info left in zk for " + topologyId);
+                this.taskHbMap = new ConcurrentHashMap<Integer, TaskHeartbeat>();
+                this.taskHbs = new TopologyTaskHbInfo(this.topologyId, this.taskId);
+                this.taskHbs.set_taskHbs(taskHbMap);
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to get topology heartbeat from zk", e);
+        }
+        this.MAX_NUM_TASK_HB_SEND = ConfigExtension.getTopologyTaskHbSendNumber(conf);
+    }
+
+    public void process(Tuple input) {
+        int sourceTask = input.getSourceTask();
+        int uptime = (Integer) input.getValue(0);
+        
+        // Update the heartbeat for source task
+        TaskHeartbeat taskHb = taskHbMap.get(sourceTask);
+        if (taskHb == null) {
+            taskHb = new TaskHeartbeat(TimeUtils.current_time_secs(), uptime);
+            taskHbMap.put(sourceTask, taskHb);
+        } else {
+            taskHb.set_time(TimeUtils.current_time_secs());
+            taskHb.set_uptime(uptime);
+        }
+
+        // Send heartbeat info of all tasks to nimbus
+        if (sourceTask == taskId) {
+            // Send heartbeat info of MAX_NUM_TASK_HB_SEND tasks each time
+            TopologyTaskHbInfo tmpTaskHbInfo = new TopologyTaskHbInfo(topologyId, taskId);
+            Map<Integer, TaskHeartbeat> tmpTaskHbMap = new ConcurrentHashMap<Integer, TaskHeartbeat>();
+            tmpTaskHbInfo.set_taskHbs(tmpTaskHbMap);
+
+            int sendCount = 0;
+            for (Entry<Integer, TaskHeartbeat> entry : taskHbMap.entrySet()) {
+                tmpTaskHbMap.put(entry.getKey(), entry.getValue());
+                sendCount++;
+
+                if (sendCount >= MAX_NUM_TASK_HB_SEND) {
+                    setTaskHeatbeat(tmpTaskHbInfo);
+                    tmpTaskHbMap.clear();
+                    sendCount = 0;
+                }
+            }
+            if (tmpTaskHbMap.size() > 0) {
+                setTaskHeatbeat(tmpTaskHbInfo);
+            }
+        }
+    }
+    
+    private void setTaskHeatbeat(TopologyTaskHbInfo topologyTaskHbInfo) {
+        try {
+            if (topologyTaskHbInfo == null) {
+                return;
+            }
+            if (topologyTaskHbInfo.get_taskHbs() == null) {
+                return;
+            }
+
+            client.getClient().updateTaskHeartbeat(topologyTaskHbInfo);
+
+            String info = "";
+            for (Entry<Integer, TaskHeartbeat> entry : topologyTaskHbInfo.get_taskHbs().entrySet()) {
+                info += " " + entry.getKey() + "-" + entry.getValue().get_time(); 
+            }
+            LOG.info("Update task heartbeat:" + info);
+        } catch (TException e) {
+            LOG.error("Failed to update task heartbeat info", e);
+            if (client != null) {
+                client.close();
+                client = NimbusClient.getConfiguredClient(conf);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopoMasterCtrlEvent.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopoMasterCtrlEvent.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopoMasterCtrlEvent.java
new file mode 100644
index 0000000..adc8dc0
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopoMasterCtrlEvent.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.task.master;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Definition of control event which is used for the control purpose in
+ * topology, e.g. back pressure
+ * 
+ * @author Basti Liu 
+ */
+
+public class TopoMasterCtrlEvent implements Serializable {
+
+    private static final long serialVersionUID = 5929540385279089750L;
+
+    public enum EventType {
+        startBackpressure, stopBackpressure, syncBackpressureState, updateBackpressureConfig, defaultType
+    }
+
+    private EventType eventType;
+    private List<Object> eventValue;
+
+    public TopoMasterCtrlEvent() {
+        eventType = EventType.defaultType;
+        eventValue = null;
+    }
+
+    public TopoMasterCtrlEvent(EventType type, List<Object> value) {
+        this.eventType = type;
+        this.eventValue = value;
+    }
+
+    public EventType getEventType() {
+        return eventType;
+    }
+
+    public void setEventType(EventType type) {
+        this.eventType = type;
+    }
+
+    public List<Object> getEventValue() {
+        return eventValue;
+    }
+
+    public void setEventValue(List<Object> value) {
+        this.eventValue = value;
+    }
+
+    public void addEventValue(Object value) {
+        if (eventValue == null) {
+            eventValue = new ArrayList<Object>();
+        }
+
+        eventValue.add(value);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopologyMaster.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopologyMaster.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopologyMaster.java
new file mode 100644
index 0000000..b5fb22c
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopologyMaster.java
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.task.master;
+
+import backtype.storm.generated.*;
+import backtype.storm.task.IBolt;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IDynamicComponent;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.NimbusClient;
+import com.alibaba.jstorm.cluster.Common;
+import com.alibaba.jstorm.cluster.StormClusterState;
+import com.alibaba.jstorm.cluster.StormConfig;
+import com.alibaba.jstorm.metric.MetaType;
+import com.alibaba.jstorm.metric.MetricUtils;
+import com.alibaba.jstorm.metric.TopologyMetricContext;
+import com.alibaba.jstorm.schedule.Assignment;
+import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
+import com.alibaba.jstorm.task.backpressure.BackpressureCoordinator;
+import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatUpdater;
+import com.alibaba.jstorm.utils.IntervalCheck;
+import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.TimeUtils;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Topology master is responsible for the process of general topology
+ * information, e.g. task heartbeat update, metrics data update....
+ *
+ * @author Basti Liu
+ */
+public class TopologyMaster implements IBolt, IDynamicComponent  {
+
+    private static final long serialVersionUID = 4690656768333833626L;
+
+    private static final Logger LOG = getLogger(TopologyMaster.class);
+    private final Logger metricLogger = getLogger(TopologyMetricContext.class);
+
+    public static final int MAX_BATCH_SIZE = 10000;
+    private final MetricInfo dummy = MetricUtils.mkMetricInfo();
+
+    public static final String FIELD_METRIC_WORKER = "worker";
+    public static final String FIELD_METRIC_METRICS = "metrics";
+    public static final String FILED_HEARBEAT_EVENT = "hbEvent";
+    public static final String FILED_CTRL_EVENT = "ctrlEvent";
+
+    private Map conf;
+    private StormClusterState zkCluster;
+    private OutputCollector collector;
+
+    private int taskId;
+    private String topologyId;
+    private volatile Set<ResourceWorkerSlot> workerSet;
+    private IntervalCheck intervalCheck;
+
+    private TaskHeartbeatUpdater taskHeartbeatUpdater;
+
+    private BackpressureCoordinator backpressureCoordinator;
+
+    private TopologyMetricContext topologyMetricContext;
+
+    private ScheduledExecutorService uploadMetricsExecutor;
+
+    private Thread updateThread;
+    private BlockingQueue<Tuple> queue = new LinkedBlockingDeque<Tuple>();
+    private IntervalCheck threadAliveCheck;
+
+    private volatile boolean isActive = true;
+
+    private class TopologyMasterRunnable implements Runnable {
+        @Override
+        public void run() {
+            while (isActive) {
+                try {
+                    Tuple event = queue.take();
+                    if (event != null) {
+                        eventHandle(event);
+                    }
+                } catch (Throwable e) {
+                    LOG.error("Failed to process event", e);
+                }
+            }
+        }
+        
+    }
+    @Override
+    public void prepare(Map stormConf, TopologyContext context,
+                        OutputCollector collector) {
+        this.conf = context.getStormConf();
+        this.collector = collector;
+        this.taskId = context.getThisTaskId();
+        this.topologyId = context.getTopologyId();
+        this.zkCluster = context.getZkCluster();
+
+        try {
+            Assignment assignment = zkCluster.assignment_info(topologyId, null);
+            this.workerSet = assignment.getWorkers();
+            intervalCheck = new IntervalCheck();
+            intervalCheck.setInterval(10);
+            intervalCheck.start();
+        } catch (Exception e) {
+            LOG.error("Failed to get assignment for " + topologyId);
+        }
+
+        this.taskHeartbeatUpdater = new TaskHeartbeatUpdater(this.conf, topologyId, taskId, zkCluster);
+
+        this.backpressureCoordinator = new BackpressureCoordinator(collector, context, taskId);
+
+        this.topologyMetricContext = new TopologyMetricContext(topologyId, this.workerSet, this.conf);
+
+        this.uploadMetricsExecutor = Executors.newSingleThreadScheduledExecutor();
+        this.uploadMetricsExecutor.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                int secOffset = TimeUtils.secOffset();
+                int offset = 35;
+                if (secOffset < offset) {
+                    JStormUtils.sleepMs((offset - secOffset) * 1000);
+                } else if (secOffset == offset) {
+                    // do nothing
+                } else {
+                    JStormUtils.sleepMs((60 - secOffset + offset) * 1000);
+                }
+                if (topologyMetricContext.getUploadedWorkerNum() > 0) {
+                    metricLogger.info("force upload metrics.");
+                    mergeAndUpload();
+                }
+            }
+        }, 5, 60, TimeUnit.SECONDS);
+
+        updateThread = new Thread(new TopologyMasterRunnable());
+        updateThread.start();
+
+        threadAliveCheck = new IntervalCheck();
+        threadAliveCheck.setInterval(30);
+        threadAliveCheck.start();
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        if (input != null) {
+            
+            try {
+                queue.put(input);
+            } catch (InterruptedException e) {
+                LOG.error("Failed to put event to taskHb updater's queue", e);
+            }
+            
+            if (threadAliveCheck.check()) {
+                if (updateThread == null || updateThread.isAlive() == false) {
+                    updateThread = new Thread(new TopologyMasterRunnable());
+                    updateThread.start();
+                }
+            }
+
+            collector.ack(input);
+        } else {
+            LOG.error("Received null tuple!");
+        }
+    }
+
+    @Override
+    public void cleanup() {
+	    isActive = false;
+        LOG.info("Successfully cleanup");
+    }
+
+    private void updateTopologyWorkerSet() {
+        if (intervalCheck.check()) {
+            Assignment assignment;
+            try {
+                assignment = zkCluster.assignment_info(topologyId, null);
+                this.workerSet = assignment.getWorkers();
+            } catch (Exception e) {
+                LOG.error("Failed to get assignment for " + topologyId);
+            }
+
+        }
+    }
+
+    private void eventHandle(Tuple input) {
+        updateTopologyWorkerSet();
+
+        String stream = input.getSourceStreamId();
+
+        try {
+            if (stream.equals(Common.TOPOLOGY_MASTER_HB_STREAM_ID)) {
+                taskHeartbeatUpdater.process(input);
+            } else if (stream.equals(Common.TOPOLOGY_MASTER_METRICS_STREAM_ID)) {
+                updateMetrics(input);
+            } else if (stream.equals(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID)) {
+                backpressureCoordinator.process(input);
+            }
+        } catch (Exception e) {
+            LOG.error("Failed to handle event: " + input.toString(), e);
+        }
+    }
+
+    @Override
+    public void update(Map conf) {
+        LOG.info("Topology master received new conf:" + conf);
+
+        if (backpressureCoordinator.isBackpressureConfigChange(conf)) {
+            backpressureCoordinator.updateBackpressureConfig(conf);
+        }
+    }
+
+    private void updateMetrics(Tuple input) {
+        String workerSlot = (String) input.getValueByField(FIELD_METRIC_WORKER);
+        WorkerUploadMetrics metrics = (WorkerUploadMetrics) input.getValueByField(FIELD_METRIC_METRICS);
+        topologyMetricContext.addToMemCache(workerSlot, metrics.get_allMetrics());
+        metricLogger.info("received metrics from:{}, size:{}", workerSlot, metrics.get_allMetrics().get_metrics_size());
+
+        if (topologyMetricContext.readyToUpload()) {
+            metricLogger.info("all {} worker slots have updated metrics, start merging & uploading...",
+                    topologyMetricContext.getWorkerNum());
+            uploadMetricsExecutor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    mergeAndUpload();
+                }
+            });
+        }
+    }
+
+    private void mergeAndUpload() {
+        // double check
+        if (topologyMetricContext.getUploadedWorkerNum() > 0) {
+            TopologyMetric tpMetric = topologyMetricContext.mergeMetrics();
+            if (tpMetric != null) {
+                uploadMetrics(tpMetric);
+            }
+
+            topologyMetricContext.resetUploadedMetrics();
+            //MetricUtils.logMetrics(tpMetric.get_componentMetric());
+        }
+    }
+
+    /**
+     * upload metrics sequentially due to thrift frame size limit (15MB)
+     */
+    private void uploadMetrics(TopologyMetric tpMetric) {
+        long start = System.currentTimeMillis();
+        if (StormConfig.local_mode(conf)) {
+            return;
+        } else {
+            NimbusClient client = null;
+            try {
+                client = NimbusClient.getConfiguredClient(conf);
+                Nimbus.Client client1 = client.getClient();
+
+                MetricInfo topologyMetrics = tpMetric.get_topologyMetric();
+                MetricInfo componentMetrics = tpMetric.get_componentMetric();
+                MetricInfo taskMetrics = tpMetric.get_taskMetric();
+                MetricInfo streamMetrics = tpMetric.get_streamMetric();
+                MetricInfo workerMetrics = tpMetric.get_workerMetric();
+                MetricInfo nettyMetrics = tpMetric.get_nettyMetric();
+
+                int totalSize = topologyMetrics.get_metrics_size() + componentMetrics.get_metrics_size() +
+                        taskMetrics.get_metrics_size() + streamMetrics.get_metrics_size() +
+                        workerMetrics.get_metrics_size() + nettyMetrics.get_metrics_size();
+
+                // for small topologies, send all metrics together to ease the pressure of nimbus
+                if (totalSize < MAX_BATCH_SIZE) {
+                    client1.uploadTopologyMetrics(topologyId,
+                            new TopologyMetric(topologyMetrics, componentMetrics, workerMetrics, taskMetrics,
+                                    streamMetrics, nettyMetrics));
+                } else {
+                    client1.uploadTopologyMetrics(topologyId,
+                            new TopologyMetric(topologyMetrics, componentMetrics, dummy, dummy, dummy, dummy));
+                    batchUploadMetrics(client1, topologyId, workerMetrics, MetaType.WORKER);
+                    batchUploadMetrics(client1, topologyId, taskMetrics, MetaType.TASK);
+                    batchUploadMetrics(client1, topologyId, streamMetrics, MetaType.STREAM);
+                    batchUploadMetrics(client1, topologyId, nettyMetrics, MetaType.NETTY);
+                }
+            } catch (Exception e) {
+                LOG.error("Failed to upload worker metrics", e);
+            } finally {
+                if (client != null) {
+                    client.close();
+                }
+            }
+        }
+        metricLogger.info("upload metrics, cost:{}", System.currentTimeMillis() - start);
+    }
+
+    private void batchUploadMetrics(Nimbus.Client client, String topologyId, MetricInfo metricInfo, MetaType metaType) {
+        if (metricInfo.get_metrics_size() > MAX_BATCH_SIZE) {
+            Map<String, Map<Integer, MetricSnapshot>> data = metricInfo.get_metrics();
+
+            Map<String, Map<Integer, MetricSnapshot>> part = Maps.newHashMapWithExpectedSize(MAX_BATCH_SIZE);
+            MetricInfo uploadPart = new MetricInfo();
+            int i = 0;
+            for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : data.entrySet()) {
+                part.put(entry.getKey(), entry.getValue());
+                if (++i >= MAX_BATCH_SIZE) {
+                    uploadPart.set_metrics(part);
+                    doUpload(client, topologyId, uploadPart, metaType);
+
+                    i = 0;
+                    part.clear();
+                }
+            }
+            if (part.size() > 0) {
+                uploadPart.set_metrics(part);
+                doUpload(client, topologyId, uploadPart, metaType);
+            }
+        } else {
+            doUpload(client, topologyId, metricInfo, metaType);
+        }
+    }
+
+    private void doUpload(Nimbus.Client client, String topologyId, MetricInfo part, MetaType metaType) {
+        try {
+            if (metaType == MetaType.TASK) {
+                client.uploadTopologyMetrics(topologyId,
+                        new TopologyMetric(dummy, dummy, dummy, part, dummy, dummy));
+            } else if (metaType == MetaType.STREAM) {
+                client.uploadTopologyMetrics(topologyId,
+                        new TopologyMetric(dummy, dummy, dummy, dummy, part, dummy));
+            } else if (metaType == MetaType.WORKER) {
+                client.uploadTopologyMetrics(topologyId,
+                        new TopologyMetric(dummy, dummy, part, dummy, dummy, dummy));
+            } else if (metaType == MetaType.NETTY) {
+                client.uploadTopologyMetrics(topologyId,
+                        new TopologyMetric(dummy, dummy, dummy, dummy, dummy, part));
+            }
+        } catch (Exception ex) {
+            LOG.error("Error", ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorQueue.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorQueue.java
index 4a4a72b..8f0138a 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorQueue.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorQueue.java
@@ -1,4 +1,4 @@
-/**
+package com.alibaba.jstorm.utils; /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorRunable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorRunable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorRunable.java
index 17b7885..161156b 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorRunable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorRunable.java
@@ -17,67 +17,59 @@
  */
 package com.alibaba.jstorm.utils;
 
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.utils.DisruptorQueue;
-
 import com.alibaba.jstorm.callback.AsyncLoopRunnable;
 import com.alibaba.jstorm.callback.RunnableCallback;
-import com.alibaba.jstorm.common.metric.QueueGauge;
-import com.alibaba.jstorm.common.metric.Timer;
-import com.alibaba.jstorm.metric.JStormHealthCheck;
+import com.alibaba.jstorm.common.metric.*;
 import com.alibaba.jstorm.metric.JStormMetrics;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.metric.MetricUtils;
+import com.alibaba.jstorm.metric.JStormHealthCheck;
 import com.alibaba.jstorm.metric.MetricDef;
 import com.lmax.disruptor.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
 
 //import com.alibaba.jstorm.message.zeroMq.ISendConnection;
 
 /**
- * 
  * Disruptor Consumer thread
  * 
  * @author yannian
- * 
  */
-public abstract class DisruptorRunable extends RunnableCallback implements
-        EventHandler {
-    private final static Logger LOG = LoggerFactory
-            .getLogger(DisruptorRunable.class);
+public abstract class DisruptorRunable extends RunnableCallback implements EventHandler {
+    private final static Logger LOG = LoggerFactory.getLogger(DisruptorRunable.class);
 
     protected DisruptorQueue queue;
     protected String idStr;
-    protected Timer timer;
+    protected AsmHistogram timer;
     protected AtomicBoolean shutdown = AsyncLoopRunnable.getShutdown();
 
     public DisruptorRunable(DisruptorQueue queue, String idStr) {
         this.queue = queue;
-        this.timer =
-                JStormMetrics.registerWorkerTimer(idStr + MetricDef.TIME_TYPE);
         this.idStr = idStr;
 
-        QueueGauge queueGauge =
-                new QueueGauge(idStr + MetricDef.QUEUE_TYPE, queue);
-        JStormMetrics.registerWorkerGauge(queueGauge, idStr
-                + MetricDef.QUEUE_TYPE);
+        this.timer =
+                (AsmHistogram) JStormMetrics.registerWorkerMetric(MetricUtils.workerMetricName(idStr + MetricDef.TIME_TYPE, MetricType.HISTOGRAM),
+                        new AsmHistogram());
+
+        QueueGauge queueGauge = new QueueGauge(queue, idStr, MetricDef.QUEUE_TYPE);
+        JStormMetrics.registerWorkerMetric(MetricUtils.workerMetricName(idStr + MetricDef.QUEUE_TYPE, MetricType.GAUGE), new AsmGauge(queueGauge));
 
         JStormHealthCheck.registerWorkerHealthCheck(idStr, queueGauge);
     }
 
-    public abstract void handleEvent(Object event, boolean endOfBatch)
-            throws Exception;
+    public abstract void handleEvent(Object event, boolean endOfBatch) throws Exception;
 
     /**
      * This function need to be implements
      * 
-     * @see com.lmax.disruptor.EventHandler#onEvent(java.lang.Object, long,
-     *      boolean)
+     * @see EventHandler#onEvent(Object, long, boolean)
      */
     @Override
-    public void onEvent(Object event, long sequence, boolean endOfBatch)
-            throws Exception {
+    public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
         if (event == null) {
             return;
         }
@@ -87,7 +79,7 @@ public abstract class DisruptorRunable extends RunnableCallback implements
             handleEvent(event, endOfBatch);
         } finally {
             long end = System.nanoTime();
-            timer.update((end - start)/1000000.0d);
+            timer.update((end - start) / TimeUtils.NS_PER_US);
         }
     }
 
@@ -96,17 +88,15 @@ public abstract class DisruptorRunable extends RunnableCallback implements
         LOG.info("Successfully start thread " + idStr);
         queue.consumerStarted();
 
-        while (shutdown.get() == false) {
+        while (!shutdown.get()) {
             queue.consumeBatchWhenAvailable(this);
-
         }
-
         LOG.info("Successfully exit thread " + idStr);
     }
 
     @Override
     public void shutdown() {
-        JStormMetrics.unregisterWorkerMetric(idStr + MetricDef.QUEUE_TYPE);
+        JStormMetrics.unregisterWorkerMetric(MetricUtils.workerMetricName(idStr + MetricDef.QUEUE_TYPE, MetricType.GAUGE));
         JStormHealthCheck.unregisterWorkerHealthCheck(idStr);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/EPlatform.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/EPlatform.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/EPlatform.java
index 8c62d6f..2773963 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/EPlatform.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/EPlatform.java
@@ -17,34 +17,18 @@
  */
 package com.alibaba.jstorm.utils;
 
-public enum EPlatform {  
-    Any("any"),  
-    Linux("Linux"),  
-    Mac_OS("Mac OS"),  
-    Mac_OS_X("Mac OS X"),  
-    Windows("Windows"),  
-    OS2("OS/2"),  
-    Solaris("Solaris"),  
-    SunOS("SunOS"),  
-    MPEiX("MPE/iX"),  
-    HP_UX("HP-UX"),  
-    AIX("AIX"),  
-    OS390("OS/390"),  
-    FreeBSD("FreeBSD"),  
-    Irix("Irix"),  
-    Digital_Unix("Digital Unix"),  
-    NetWare_411("NetWare"),  
-    OSF1("OSF1"),  
-    OpenVMS("OpenVMS"),  
-    Others("Others");  
-      
-    private EPlatform(String desc){  
-        this.description = desc;  
-    }  
-      
-    public String toString(){  
-        return description;  
-    }  
-      
-    private String description;  
-} 
+public enum EPlatform {
+    Any("any"), Linux("Linux"), Mac_OS("Mac OS"), Mac_OS_X("Mac OS X"), Windows("Windows"), OS2("OS/2"), Solaris("Solaris"), SunOS("SunOS"), MPEiX("MPE/iX"), HP_UX(
+            "HP-UX"), AIX("AIX"), OS390("OS/390"), FreeBSD("FreeBSD"), Irix("Irix"), Digital_Unix("Digital Unix"), NetWare_411("NetWare"), OSF1("OSF1"), OpenVMS(
+            "OpenVMS"), Others("Others");
+
+    private EPlatform(String desc) {
+        this.description = desc;
+    }
+
+    public String toString() {
+        return description;
+    }
+
+    private String description;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/FileAttribute.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/FileAttribute.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/FileAttribute.java
index e33167a..7099169 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/FileAttribute.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/FileAttribute.java
@@ -77,8 +77,7 @@ public class FileAttribute implements Serializable, JSONAware {
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
 
     @Override
@@ -122,8 +121,7 @@ public class FileAttribute implements Serializable, JSONAware {
 
         String jsonString = JStormUtils.to_json(map);
 
-        Map<String, Map> map2 =
-                (Map<String, Map>) JStormUtils.from_json(jsonString);
+        Map<String, Map> map2 = (Map<String, Map>) JStormUtils.from_json(jsonString);
 
         Map jObject = map2.get("test");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java
index 20c1f7a..378ee26 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java
@@ -29,8 +29,7 @@ public class HttpserverUtils {
 
     public static final String HTTPSERVER_LOGVIEW_PARAM_CMD_JSTACK = "jstack";
 
-    public static final String HTTPSERVER_LOGVIEW_PARAM_CMD_SHOW_CONF =
-            "showConf";
+    public static final String HTTPSERVER_LOGVIEW_PARAM_CMD_SHOW_CONF = "showConf";
 
     public static final String HTTPSERVER_LOGVIEW_PARAM_LOGFILE = "log";
 
@@ -38,8 +37,7 @@ public class HttpserverUtils {
 
     public static final String HTTPSERVER_LOGVIEW_PARAM_DIR = "dir";
 
-    public static final String HTTPSERVER_LOGVIEW_PARAM_WORKER_PORT =
-            "workerPort";
+    public static final String HTTPSERVER_LOGVIEW_PARAM_WORKER_PORT = "workerPort";
 
     public static final String HTTPSERVER_LOGVIEW_PARAM_SIZE_FORMAT = "%016d\n";
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/IntervalCheck.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/IntervalCheck.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/IntervalCheck.java
index 992659c..de7b504 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/IntervalCheck.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/IntervalCheck.java
@@ -1,83 +1,74 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.utils;
-
-import java.io.Serializable;
-
-public class IntervalCheck implements Serializable {
-
-    /**
-	 * 
-	 */
-    private static final long serialVersionUID = 8952971673547362883L;
-
-    long lastCheck = System.currentTimeMillis();
-
-    // default interval is 1 second
-    long interval = 1000;
-
-    /*
-     * if last check time is before interval seconds, return true, otherwise
-     * return false
-     */
-    public boolean check() {
-        return checkAndGet() != null;
-    }
-
-    /**
-     * 
-     * @return
-     */
-    public Double checkAndGet() {
-        long now = System.currentTimeMillis();
-
-        synchronized (this) {
-            if (now >= interval + lastCheck) {
-                double pastSecond = ((double) (now - lastCheck)) / 1000;
-                lastCheck = now;
-                return pastSecond;
-            }
-        }
-
-        return null;
-    }
-
-    public long getInterval() {
-        return interval/1000;
-    }
-    
-    public long getIntervalMs() {
-        return interval;
-    }
-
-    public void setInterval(long interval) {
-        this.interval = interval * 1000;
-    }
-    
-    public void setIntervalMs(long interval) {
-        this.interval = interval;
-    }
-
-    public void adjust(long addTimeMillis) {
-        lastCheck += addTimeMillis;
-    }
-
-    public void start() {
-        lastCheck = System.currentTimeMillis();
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.utils;
+
+import java.io.Serializable;
+
+public class IntervalCheck implements Serializable {
+    private static final long serialVersionUID = 8952971673547362883L;
+
+    long lastCheck = System.currentTimeMillis();
+
+    // default interval is 1 second
+    long interval = 1000;
+
+    /*
+     * if last check time is before interval seconds, return true, otherwise return false
+     */
+    public boolean check() {
+        return checkAndGet() != null;
+    }
+
+    public Double checkAndGet() {
+        long now = System.currentTimeMillis();
+
+        synchronized (this) {
+            if (now >= interval + lastCheck) {
+                double pastSecond = ((double) (now - lastCheck)) / 1000;
+                lastCheck = now;
+                return pastSecond;
+            }
+        }
+
+        return null;
+    }
+
+    public long getInterval() {
+        return interval / 1000;
+    }
+
+    public long getIntervalMs() {
+        return interval;
+    }
+
+    public void setInterval(long interval) {
+        this.interval = interval * 1000;
+    }
+
+    public void setIntervalMs(long interval) {
+        this.interval = interval;
+    }
+
+    public void adjust(long addTimeMillis) {
+        lastCheck += addTimeMillis;
+    }
+
+    public void start() {
+        lastCheck = System.currentTimeMillis();
+    }
+}