You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:49 UTC

[11/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerData.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerData.java
index da69070..0cc16e4 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerData.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerData.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -17,27 +18,6 @@
  */
 package com.alibaba.jstorm.daemon.worker;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URL;
-import java.security.InvalidParameterException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.Config;
 import backtype.storm.generated.InvalidTopologyException;
 import backtype.storm.generated.StormTopology;
@@ -48,28 +28,38 @@ import backtype.storm.scheduler.WorkerSlot;
 import backtype.storm.utils.DisruptorQueue;
 import backtype.storm.utils.Utils;
 import backtype.storm.utils.WorkerClassLoader;
-
 import com.alibaba.jstorm.callback.AsyncLoopDefaultKill;
+import com.alibaba.jstorm.callback.AsyncLoopThread;
 import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.cluster.Cluster;
-import com.alibaba.jstorm.cluster.ClusterState;
-import com.alibaba.jstorm.cluster.Common;
-import com.alibaba.jstorm.cluster.StormClusterState;
-import com.alibaba.jstorm.cluster.StormConfig;
-import com.alibaba.jstorm.common.metric.window.Metric;
+import com.alibaba.jstorm.cluster.*;
+import com.alibaba.jstorm.common.metric.AsmGauge;
+import com.alibaba.jstorm.common.metric.AsmMetric;
 import com.alibaba.jstorm.daemon.nimbus.StatusType;
 import com.alibaba.jstorm.daemon.worker.timer.TimerTrigger;
 import com.alibaba.jstorm.message.netty.ControlMessage;
+import com.alibaba.jstorm.metric.*;
 import com.alibaba.jstorm.schedule.Assignment;
-import com.alibaba.jstorm.schedule.Assignment.AssignmentType;
 import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
 import com.alibaba.jstorm.task.TaskShutdownDameon;
 import com.alibaba.jstorm.utils.JStormServerUtils;
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.zk.ZkTool;
+import com.codahale.metrics.Gauge;
 import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.dsl.ProducerType;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.security.InvalidParameterException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.alibaba.jstorm.schedule.Assignment.AssignmentType;
 
 public class WorkerData {
 
@@ -112,6 +102,8 @@ public class WorkerData {
 
     private volatile Set<Integer> outboundTasks;
     private Set<Integer> localTasks;
+    private Set<Integer> localNodeTasks;
+
 
     private ConcurrentHashMap<Integer, DisruptorQueue> innerTaskTransfer;
     private ConcurrentHashMap<Integer, DisruptorQueue> deserializeQueues;
@@ -152,15 +144,18 @@ public class WorkerData {
     private ScheduledExecutorService threadPool;
 
     private volatile Long assignmentTS; // Assignment timeStamp. The time of
-                                        // last update of assignment
+    // last update of assignment
+
     private volatile AssignmentType assignmentType;
-    
+
     private IConnection recvConnection;
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    public WorkerData(Map conf, IContext context, String topology_id,
-            String supervisor_id, int port, String worker_id, String jar_path)
-            throws Exception {
+    private JStormMetricsReporter metricReporter;
+
+    private AsyncLoopThread healthReporterThread;
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public WorkerData(Map conf, IContext context, String topology_id, String supervisor_id, int port, String worker_id, String jar_path) throws Exception {
 
         this.conf = conf;
         this.context = context;
@@ -170,7 +165,7 @@ public class WorkerData {
         this.workerId = worker_id;
 
         this.shutdown = new AtomicBoolean(false);
-        
+
         this.monitorEnable = new AtomicBoolean(true);
         this.topologyStatus = StatusType.active;
 
@@ -183,29 +178,54 @@ public class WorkerData {
         this.zkClusterstate = ZkTool.mk_distributed_cluster_state(conf);
         this.zkCluster = Cluster.mk_storm_cluster_state(zkClusterstate);
 
-        Map rawConf =
-                StormConfig.read_supervisor_topology_conf(conf, topology_id);
+        Map rawConf = StormConfig.read_supervisor_topology_conf(conf, topology_id);
         this.stormConf = new HashMap<Object, Object>();
         this.stormConf.putAll(conf);
         this.stormConf.putAll(rawConf);
-        
+
+        JStormMetrics.setTopologyId(topology_id);
+        JStormMetrics.setPort(port);
+        JStormMetrics.setDebug(ConfigExtension.isEnableMetricDebug(stormConf));
+        JStormMetrics.setEnabled(ConfigExtension.isEnableMetrics(stormConf));
+        JStormMetrics.addDebugMetrics(ConfigExtension.getDebugMetricNames(stormConf));
+        AsmMetric.setSampleRate(ConfigExtension.getMetricSampleRate(stormConf));
+
         ConfigExtension.setLocalSupervisorId(stormConf, supervisorId);
         ConfigExtension.setLocalWorkerId(stormConf, workerId);
         ConfigExtension.setLocalWorkerPort(stormConf, port);
         ControlMessage.setPort(port);
-        Metric.setEnable(ConfigExtension.isEnablePerformanceMetrics(stormConf));
+
+        JStormMetrics.registerWorkerTopologyMetric(
+                JStormMetrics.workerMetricName(MetricDef.CPU_USED_RATIO, MetricType.GAUGE),
+                new AsmGauge(new Gauge<Double>() {
+                    @Override
+                    public Double getValue() {
+                        return JStormUtils.getCpuUsage();
+                    }
+                }));
+
+        JStormMetrics.registerWorkerTopologyMetric(JStormMetrics.workerMetricName(MetricDef.MEMORY_USED, MetricType.GAUGE),
+                new AsmGauge(new Gauge<Double>() {
+                    @Override
+                    public Double getValue() {
+                        return JStormUtils.getMemUsage();
+                    }
+                }));
+
+        JStormMetrics.registerWorkerMetric(JStormMetrics.workerMetricName(MetricDef.DISK_USAGE, MetricType.GAUGE), new AsmGauge(new Gauge<Double>() {
+            @Override
+            public Double getValue() {
+                return JStormUtils.getDiskUsage();
+            }
+        }));
 
         LOG.info("Worker Configuration " + stormConf);
 
         try {
+            boolean enableClassloader = ConfigExtension.isEnableTopologyClassLoader(stormConf);
+            boolean enableDebugClassloader = ConfigExtension.isEnableClassloaderDebug(stormConf);
 
-            boolean enableClassloader =
-                    ConfigExtension.isEnableTopologyClassLoader(stormConf);
-            boolean enableDebugClassloader =
-                    ConfigExtension.isEnableClassloaderDebug(stormConf);
-
-            if (jar_path == null && enableClassloader == true
-                    && !conf.get(Config.STORM_CLUSTER_MODE).equals("local")) {
+            if (jar_path == null && enableClassloader == true && !conf.get(Config.STORM_CLUSTER_MODE).equals("local")) {
                 LOG.error("enable classloader, but not app jar");
                 throw new InvalidParameterException();
             }
@@ -221,14 +241,11 @@ public class WorkerData {
                     urls.add(url);
                 }
                 urlArray = urls.toArray(new URL[0]);
-
             }
 
-            WorkerClassLoader.mkInstance(urlArray, ClassLoader
-                    .getSystemClassLoader(), ClassLoader.getSystemClassLoader()
-                    .getParent(), enableClassloader, enableDebugClassloader);
+            WorkerClassLoader.mkInstance(urlArray, ClassLoader.getSystemClassLoader(), ClassLoader.getSystemClassLoader().getParent(), enableClassloader,
+                    enableDebugClassloader);
         } catch (Exception e) {
-            // TODO Auto-generated catch block
             LOG.error("init jarClassLoader error!", e);
             throw new InvalidParameterException();
         }
@@ -237,39 +254,27 @@ public class WorkerData {
             this.context = TransportFactory.makeContext(stormConf);
         }
 
-        boolean disruptorUseSleep =
-                ConfigExtension.isDisruptorUseSleep(stormConf);
+        boolean disruptorUseSleep = ConfigExtension.isDisruptorUseSleep(stormConf);
         DisruptorQueue.setUseSleep(disruptorUseSleep);
-        boolean isLimited =
-                ConfigExtension.getTopologyBufferSizeLimited(stormConf);
+        boolean isLimited = ConfigExtension.getTopologyBufferSizeLimited(stormConf);
         DisruptorQueue.setLimited(isLimited);
-        LOG.info("Disruptor use sleep:" + disruptorUseSleep + ", limited size:"
-                + isLimited);
+        LOG.info("Disruptor use sleep:" + disruptorUseSleep + ", limited size:" + isLimited);
 
         // this.transferQueue = new LinkedBlockingQueue<TransferData>();
-        int buffer_size =
-                Utils.getInt(conf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE));
-        WaitStrategy waitStrategy =
-                (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(conf);
-        this.transferQueue =
-                DisruptorQueue.mkInstance("TotalTransfer", ProducerType.MULTI,
-                        buffer_size, waitStrategy);
+        int buffer_size = Utils.getInt(stormConf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE));
+        WaitStrategy waitStrategy = (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(stormConf);
+        this.transferQueue = DisruptorQueue.mkInstance("TotalTransfer", ProducerType.MULTI, buffer_size, waitStrategy);
         this.transferQueue.consumerStarted();
-        this.sendingQueue =
-                DisruptorQueue.mkInstance("TotalSending", ProducerType.MULTI,
-                        buffer_size, waitStrategy);
+        this.sendingQueue = DisruptorQueue.mkInstance("TotalSending", ProducerType.MULTI, buffer_size, waitStrategy);
         this.sendingQueue.consumerStarted();
 
         this.nodeportSocket = new ConcurrentHashMap<WorkerSlot, IConnection>();
         this.taskNodeport = new ConcurrentHashMap<Integer, WorkerSlot>();
         this.workerToResource = new ConcurrentSkipListSet<ResourceWorkerSlot>();
-        this.innerTaskTransfer =
-                new ConcurrentHashMap<Integer, DisruptorQueue>();
-        this.deserializeQueues =
-                new ConcurrentHashMap<Integer, DisruptorQueue>();
+        this.innerTaskTransfer = new ConcurrentHashMap<Integer, DisruptorQueue>();
+        this.deserializeQueues = new ConcurrentHashMap<Integer, DisruptorQueue>();
         this.tasksToComponent = new ConcurrentHashMap<Integer, String>();
-        this.componentToSortedTasks =
-                new ConcurrentHashMap<String, List<Integer>>();
+        this.componentToSortedTasks = new ConcurrentHashMap<String, List<Integer>>();
 
         Assignment assignment = zkCluster.assignment_info(topologyId, null);
         if (assignment == null) {
@@ -288,8 +293,7 @@ public class WorkerData {
         LOG.info("Current worker taskList:" + taskids);
 
         // deserialize topology code from local dir
-        rawTopology =
-                StormConfig.read_supervisor_topology_code(conf, topology_id);
+        rawTopology = StormConfig.read_supervisor_topology_code(conf, topology_id);
         sysTopology = Common.system_topology(stormConf, rawTopology);
 
         generateMaps();
@@ -301,15 +305,17 @@ public class WorkerData {
         threadPool = Executors.newScheduledThreadPool(THREAD_POOL_NUM);
         TimerTrigger.setScheduledExecutorService(threadPool);
 
+        if (!StormConfig.local_mode(stormConf)) {
+            healthReporterThread = new AsyncLoopThread(new JStormHealthReporter(this));
+        }
+
         try {
-            Long tmp =
-                    StormConfig.read_supervisor_topology_timestamp(conf,
-                            topology_id);
+            Long tmp = StormConfig.read_supervisor_topology_timestamp(conf, topology_id);
             assignmentTS = (tmp == null ? System.currentTimeMillis() : tmp);
         } catch (FileNotFoundException e) {
             assignmentTS = System.currentTimeMillis();
         }
-        
+
         outboundTasks = new HashSet<Integer>();
 
         LOG.info("Successfully create WorkerData");
@@ -317,13 +323,10 @@ public class WorkerData {
     }
 
     /**
-     * private ConcurrentHashMap<Integer, WorkerSlot> taskNodeport; private
-     * HashMap<Integer, String> tasksToComponent; private Map<String,
-     * List<Integer>> componentToSortedTasks; private Map<String, Map<String,
-     * Fields>> componentToStreamToFields; private Map<String, Object>
-     * defaultResources; private Map<String, Object> userResources; private
-     * Map<String, Object> executorData; private Map registeredMetrics;
-     * 
+     * private ConcurrentHashMap<Integer, WorkerSlot> taskNodeport; private HashMap<Integer, String> tasksToComponent; private Map<String, List<Integer>>
+     * componentToSortedTasks; private Map<String, Map<String, Fields>> componentToStreamToFields; private Map<String, Object> defaultResources; private
+     * Map<String, Object> userResources; private Map<String, Object> executorData; private Map registeredMetrics;
+     *
      * @throws Exception
      */
     private void generateMaps() throws Exception {
@@ -335,7 +338,7 @@ public class WorkerData {
         this.registeredMetrics = new HashMap();
     }
 
-    public Map<Object, Object> getConf() {
+    public Map<Object, Object> getRawConf() {
         return conf;
     }
 
@@ -351,6 +354,10 @@ public class WorkerData {
         this.topologyStatus = topologyStatus;
     }
 
+    public Map<Object, Object> getConf() {
+        return stormConf;
+    }
+
     public Map<Object, Object> getStormConf() {
         return stormConf;
     }
@@ -396,7 +403,19 @@ public class WorkerData {
     }
 
     public ConcurrentSkipListSet<ResourceWorkerSlot> getWorkerToResource() {
-        return workerToResource;
+        synchronized (workerToResource) {
+            return workerToResource;
+        }
+    }
+
+    public void updateWorkerToResource(Set<ResourceWorkerSlot> workers) {
+        synchronized (workerToResource) {
+            Set<ResourceWorkerSlot> oldWorkers = workerToResource.clone();
+            oldWorkers.removeAll(workers);
+            if (oldWorkers.size() > 0)
+                workerToResource.removeAll(workers);
+            workerToResource.addAll(workers);
+        }
     }
 
     public ConcurrentHashMap<Integer, DisruptorQueue> getInnerTaskTransfer() {
@@ -471,8 +490,7 @@ public class WorkerData {
         this.shutdownTasks.add(shutdownTask);
     }
 
-    public List<TaskShutdownDameon> getShutdownDaemonbyTaskIds(
-            Set<Integer> taskIds) {
+    public List<TaskShutdownDameon> getShutdownDaemonbyTaskIds(Set<Integer> taskIds) {
         List<TaskShutdownDameon> ret = new ArrayList<TaskShutdownDameon>();
         for (TaskShutdownDameon shutdown : shutdownTasks) {
             if (taskIds.contains(shutdown.getTaskId()))
@@ -494,7 +512,7 @@ public class WorkerData {
             outTaskStatus.put(taskId, false);
         }
     }
-    
+
     public Map<Integer, Boolean> getOutboundTaskStatus() {
         return outTaskStatus;
     }
@@ -502,7 +520,7 @@ public class WorkerData {
     public void addOutboundTaskStatusIfAbsent(Integer taskId) {
         outTaskStatus.putIfAbsent(taskId, false);
     }
-    
+
     public void removeOutboundTaskStatus(Integer taskId) {
         outTaskStatus.remove(taskId);
     }
@@ -512,8 +530,7 @@ public class WorkerData {
     }
 
     public boolean isOutboundTaskActive(Integer taskId) {
-        return outTaskStatus.get(taskId) != null ? outTaskStatus.get(taskId)
-                : false;
+        return outTaskStatus.get(taskId) != null ? outTaskStatus.get(taskId) : false;
     }
 
     public ScheduledExecutorService getThreadPool() {
@@ -527,11 +544,11 @@ public class WorkerData {
     public Long getAssignmentTs() {
         return assignmentTS;
     }
-    
+
     public void setAssignmentType(AssignmentType type) {
         this.assignmentType = type;
     }
-    
+
     public AssignmentType getAssignmentType() {
         return assignmentType;
     }
@@ -544,28 +561,33 @@ public class WorkerData {
 
     public void updateTaskIds(Assignment assignment) {
         this.taskids.clear();
-        this.taskids.addAll(assignment
-                .getCurrentWorkerTasks(supervisorId, port));
+        this.taskids.addAll(assignment.getCurrentWorkerTasks(supervisorId, port));
+    }
+
+    public Set<Integer> getLocalNodeTasks() {
+        return localNodeTasks;
+    }
+
+    public void setLocalNodeTasks(Set<Integer> localNodeTasks) {
+        this.localNodeTasks = localNodeTasks;
     }
 
     public void setOutboundTasks(Set<Integer> outboundTasks) {
         this.outboundTasks = outboundTasks;
     }
-    
+
     public Set<Integer> getOutboundTasks() {
         return outboundTasks;
     }
 
     private void updateTaskComponentMap() throws Exception {
-        Map<Integer, String> tmp = Common.getTaskToComponent(
-                        Cluster.get_all_taskInfo(zkCluster, topologyId));
+        Map<Integer, String> tmp = Common.getTaskToComponent(Cluster.get_all_taskInfo(zkCluster, topologyId));
 
         this.tasksToComponent.putAll(tmp);
         LOG.info("Updated tasksToComponentMap:" + tasksToComponent);
 
         this.componentToSortedTasks.putAll(JStormUtils.reverse_map(tmp));
-        for (java.util.Map.Entry<String, List<Integer>> entry : componentToSortedTasks
-                .entrySet()) {
+        for (Map.Entry<String, List<Integer>> entry : componentToSortedTasks.entrySet()) {
             List<Integer> tasks = entry.getValue();
 
             Collections.sort(tasks);
@@ -573,16 +595,13 @@ public class WorkerData {
     }
 
     private void updateStormTopology() {
-        StormTopology rawTmp = null;
-        StormTopology sysTmp = null;
-
+        StormTopology rawTmp;
+        StormTopology sysTmp;
         try {
-            rawTmp =
-                    StormConfig.read_supervisor_topology_code(conf, topologyId);
+            rawTmp = StormConfig.read_supervisor_topology_code(conf, topologyId);
             sysTmp = Common.system_topology(stormConf, rawTopology);
         } catch (IOException e) {
-            LOG.error("Failed to read supervisor topology code for "
-                    + topologyId, e);
+            LOG.error("Failed to read supervisor topology code for " + topologyId, e);
             return;
         } catch (InvalidTopologyException e) {
             LOG.error("Failed to update sysTopology for " + topologyId, e);
@@ -593,8 +612,7 @@ public class WorkerData {
         updateTopology(sysTopology, sysTmp);
     }
 
-    private void updateTopology(StormTopology oldTopology,
-            StormTopology newTopology) {
+    private void updateTopology(StormTopology oldTopology, StormTopology newTopology) {
         oldTopology.set_bolts(newTopology.get_bolts());
         oldTopology.set_spouts(newTopology.get_spouts());
         oldTopology.set_state_spouts(newTopology.get_state_spouts());
@@ -604,12 +622,19 @@ public class WorkerData {
         return monitorEnable;
     }
 
-	public IConnection getRecvConnection() {
-		return recvConnection;
-	}
+    public IConnection getRecvConnection() {
+        return recvConnection;
+    }
 
-	public void setRecvConnection(IConnection recvConnection) {
-		this.recvConnection = recvConnection;
-	}
+    public void setRecvConnection(IConnection recvConnection) {
+        this.recvConnection = recvConnection;
+    }
 
+    public JStormMetricsReporter getMetricsReporter() {
+        return metricReporter;
+    }
+
+    public void setMetricsReporter(JStormMetricsReporter metricReporter) {
+        this.metricReporter = metricReporter;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerHeartbeat.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerHeartbeat.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerHeartbeat.java
index d8ec622..9d1cca7 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerHeartbeat.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerHeartbeat.java
@@ -25,8 +25,7 @@ import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
 
 /**
- * Worker's Heartbeat data woker will update the object to
- * /LOCAL-DIR/workers/${woker-id}/heartbeats
+ * Worker's Heartbeat data woker will update the object to /LOCAL-DIR/workers/${woker-id}/heartbeats
  * 
  * @author yannian/Longda
  * 
@@ -39,8 +38,7 @@ public class WorkerHeartbeat implements Serializable {
     private Set<Integer> taskIds;
     private Integer port;
 
-    public WorkerHeartbeat(int timeSecs, String topologyId,
-            Set<Integer> taskIds, Integer port) {
+    public WorkerHeartbeat(int timeSecs, String topologyId, Set<Integer> taskIds, Integer port) {
 
         this.timeSecs = timeSecs;
         this.topologyId = topologyId;
@@ -83,7 +81,6 @@ public class WorkerHeartbeat implements Serializable {
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerReportError.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerReportError.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerReportError.java
new file mode 100644
index 0000000..79075bc
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerReportError.java
@@ -0,0 +1,38 @@
+package com.alibaba.jstorm.daemon.worker;
+
+import com.alibaba.jstorm.cluster.StormClusterState;
+import com.alibaba.jstorm.utils.TimeFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.Set;
+
+/**
+ * Created by xiaojian.fxj on 2015/8/12.
+ */
+public class WorkerReportError {
+    private static Logger LOG = LoggerFactory.getLogger(WorkerReportError.class);
+    private StormClusterState zkCluster;
+    private String hostName;
+
+    public WorkerReportError(StormClusterState _storm_cluster_state,
+                             String _hostName) {
+        this.zkCluster = _storm_cluster_state;
+        this.hostName = _hostName;
+    }
+    public void report(String topology_id, Integer worker_port,
+                       Set<Integer> tasks, String error) {
+        // Report worker's error to zk
+        try {
+            Date now = new Date();
+            String nowStr = TimeFormat.getSecond(now);
+            String errorInfo = error + "on " + this.hostName + ":" + worker_port + "," + nowStr;
+            for (Integer task : tasks){
+                zkCluster.report_task_error(topology_id, task, errorInfo, null);
+            }
+        } catch (Exception e) {
+            LOG.error("Failed update "+worker_port+ "errors to ZK" + "\n", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerShutdown.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerShutdown.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerShutdown.java
index 0691fee..403c8cf 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerShutdown.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerShutdown.java
@@ -86,14 +86,13 @@ public class WorkerShutdown implements ShutdownableDameon {
             LOG.info("Worker has been shutdown already");
             return;
         }
-        
-        if(recvConnection != null) {
-        	recvConnection.close();
+
+        if (recvConnection != null) {
+            recvConnection.close();
         }
 
         AsyncLoopRunnable.getShutdown().set(true);
         threadPool.shutdown();
-        
 
         // shutdown tasks
         for (ShutdownableDameon task : shutdowntasks) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/SyncContainerHb.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/SyncContainerHb.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/SyncContainerHb.java
index 783f584..ab4213f 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/SyncContainerHb.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/SyncContainerHb.java
@@ -35,8 +35,7 @@ import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.PathUtils;
 
 public class SyncContainerHb extends RunnableCallback {
-    private final static Logger LOG = LoggerFactory
-            .getLogger(SyncContainerHb.class);
+    private final static Logger LOG = LoggerFactory.getLogger(SyncContainerHb.class);
 
     private String readDir;
     private String writeDir;
@@ -113,8 +112,7 @@ public class SyncContainerHb extends RunnableCallback {
         try {
             hb = Long.valueOf(biggest);
         } catch (Exception e) {
-            LOG.info("Heartbeat file " + biggest
-                    + " isn't a valid file, remove it");
+            LOG.info("Heartbeat file " + biggest + " isn't a valid file, remove it");
 
             String path = readDir + File.separator + biggest;
             try {
@@ -151,8 +149,7 @@ public class SyncContainerHb extends RunnableCallback {
             return;
         }
 
-        String seconds =
-                String.valueOf(System.currentTimeMillis() / SECOND_MILLISCOND);
+        String seconds = String.valueOf(System.currentTimeMillis() / SECOND_MILLISCOND);
 
         String path = writeDir + File.separator + seconds;
 
@@ -289,8 +286,7 @@ public class SyncContainerHb extends RunnableCallback {
         this.reserverNum = reserverNum;
     }
 
-    public static AsyncLoopThread mkInstance(String containerHbDir,
-            String hbDir, int timeout, int frequence) {
+    public static AsyncLoopThread mkInstance(String containerHbDir, String hbDir, int timeout, int frequence) {
         SyncContainerHb syncContainerHbThread = new SyncContainerHb();
 
         syncContainerHbThread.setReadDir(containerHbDir);
@@ -306,9 +302,7 @@ public class SyncContainerHb extends RunnableCallback {
         sb.append(",frequence:").append(frequence);
         LOG.info(sb.toString());
 
-        AsyncLoopThread thread =
-                new AsyncLoopThread(syncContainerHbThread, true,
-                        Thread.NORM_PRIORITY, true);
+        AsyncLoopThread thread = new AsyncLoopThread(syncContainerHbThread, true, Thread.NORM_PRIORITY, true);
 
         return thread;
     }
@@ -329,31 +323,23 @@ public class SyncContainerHb extends RunnableCallback {
 
     }
 
-    public static AsyncLoopThread mkSupervisorInstance(Map conf)
-            throws IOException {
-        boolean isEnableContainer =
-                ConfigExtension.isEnableContainerSupervisor();
+    public static AsyncLoopThread mkSupervisorInstance(Map conf) throws IOException {
+        boolean isEnableContainer = ConfigExtension.isEnableContainerSupervisor();
         if (isEnableContainer) {
-            String containerHbDir =
-                    ConfigExtension.getContainerSupervisorHearbeat();
+            String containerHbDir = ConfigExtension.getContainerSupervisorHearbeat();
             String hbDir = StormConfig.supervisorHearbeatForContainer(conf);
-            int timeout =
-                    ConfigExtension.getContainerHeartbeatTimeoutSeconds(conf);
-            int frequence =
-                    ConfigExtension.getContainerHeartbeatFrequence(conf);
+            int timeout = ConfigExtension.getContainerHeartbeatTimeoutSeconds(conf);
+            int frequence = ConfigExtension.getContainerHeartbeatFrequence(conf);
 
             return mkInstance(containerHbDir, hbDir, timeout, frequence);
         }
 
-        boolean isWorkerAutomaticStop =
-                ConfigExtension.isWorkerStopWithoutSupervisor(conf);
+        boolean isWorkerAutomaticStop = ConfigExtension.isWorkerStopWithoutSupervisor(conf);
         if (isWorkerAutomaticStop) {
             String containerHbDir = null;
             String hbDir = StormConfig.supervisorHearbeatForContainer(conf);
-            int timeout =
-                    ConfigExtension.getContainerHeartbeatTimeoutSeconds(conf);
-            int frequence =
-                    ConfigExtension.getContainerHeartbeatFrequence(conf);
+            int timeout = ConfigExtension.getContainerHeartbeatTimeoutSeconds(conf);
+            int frequence = ConfigExtension.getContainerHeartbeatFrequence(conf);
 
             return mkInstance(containerHbDir, hbDir, timeout, frequence);
         }
@@ -364,17 +350,14 @@ public class SyncContainerHb extends RunnableCallback {
     }
 
     public static AsyncLoopThread mkWorkerInstance(Map conf) throws IOException {
-        boolean isEnableContainer =
-                ConfigExtension.isEnableContainerSupervisor();
-        boolean isWorkerAutomaticStop =
-                ConfigExtension.isWorkerStopWithoutSupervisor(conf);
+        boolean isEnableContainer = ConfigExtension.isEnableContainerSupervisor();
+        boolean isWorkerAutomaticStop = ConfigExtension.isWorkerStopWithoutSupervisor(conf);
         if (isEnableContainer == false && isWorkerAutomaticStop == false) {
             LOG.info("Run worker without Apsara/Yarn container");
             return null;
         }
 
-        String containerHbDir =
-                StormConfig.supervisorHearbeatForContainer(conf);
+        String containerHbDir = StormConfig.supervisorHearbeatForContainer(conf);
         String hbDir = null;
         int timeout = ConfigExtension.getContainerHeartbeatTimeoutSeconds(conf);
         int frequence = ConfigExtension.getContainerHeartbeatFrequence(conf);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/WorkerHeartbeatRunable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/WorkerHeartbeatRunable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/WorkerHeartbeatRunable.java
index 74aca05..51e74f8 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/WorkerHeartbeatRunable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/WorkerHeartbeatRunable.java
@@ -44,8 +44,7 @@ import com.alibaba.jstorm.utils.TimeUtils;
  * 
  */
 public class WorkerHeartbeatRunable extends RunnableCallback {
-    private static Logger LOG = LoggerFactory
-            .getLogger(WorkerHeartbeatRunable.class);
+    private static Logger LOG = LoggerFactory.getLogger(WorkerHeartbeatRunable.class);
 
     private WorkerData workerData;
 
@@ -69,8 +68,7 @@ public class WorkerHeartbeatRunable extends RunnableCallback {
         this.worker_id = workerData.getWorkerId();
         this.port = workerData.getPort();
         this.topologyId = workerData.getTopologyId();
-        this.task_ids =
-                new CopyOnWriteArraySet<Integer>(workerData.getTaskids());
+        this.task_ids = new CopyOnWriteArraySet<Integer>(workerData.getTaskids());
         this.shutdown = workerData.getShutdown();
 
         String key = Config.WORKER_HEARTBEAT_FREQUENCY_SECS;
@@ -97,11 +95,9 @@ public class WorkerHeartbeatRunable extends RunnableCallback {
     public void doHeartbeat() throws IOException {
 
         int currtime = TimeUtils.current_time_secs();
-        WorkerHeartbeat hb =
-                new WorkerHeartbeat(currtime, topologyId, task_ids, port);
+        WorkerHeartbeat hb = new WorkerHeartbeat(currtime, topologyId, task_ids, port);
 
-        LOG.debug("Doing heartbeat:" + worker_id + ",port:" + port + ",hb"
-                + hb.toString());
+        LOG.debug("Doing heartbeat:" + worker_id + ",port:" + port + ",hb" + hb.toString());
 
         LocalState state = getWorkerState();
         state.put(Common.LS_WORKER_HEARTBEAT, hb);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/BackpressureCheckTrigger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/BackpressureCheckTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/BackpressureCheckTrigger.java
new file mode 100644
index 0000000..19ba2c9
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/BackpressureCheckTrigger.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.daemon.worker.timer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.jstorm.task.backpressure.BackpressureTrigger;
+
+
+public class BackpressureCheckTrigger extends TimerTrigger{
+    private static final Logger LOG = LoggerFactory.getLogger(BackpressureCheckTrigger.class);
+
+    private BackpressureTrigger trigger;
+
+    public BackpressureCheckTrigger(int initDelay, int frequence, String name, BackpressureTrigger trigger) {
+        if (frequence <= 0) {
+            LOG.warn(" The frequence of " + name + " is invalid");
+            frequence = 1;
+        }
+        this.firstTime = initDelay;
+        this.frequence = frequence;
+        this.trigger = trigger;
+    }
+
+    @Override
+    public void run() {
+        try {
+            trigger.checkAndTrigger();
+        } catch (Exception e) {
+            LOG.warn("Failed to publish timer event to " + name, e);
+            return;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/RotatingMapTrigger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/RotatingMapTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/RotatingMapTrigger.java
index 5a59e6f..a1a0990 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/RotatingMapTrigger.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/RotatingMapTrigger.java
@@ -29,26 +29,20 @@ import com.alibaba.jstorm.task.acker.Acker;
 import com.alibaba.jstorm.utils.JStormUtils;
 
 public class RotatingMapTrigger extends TimerTrigger {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(RotatingMapTrigger.class);
+    private static final Logger LOG = LoggerFactory.getLogger(RotatingMapTrigger.class);
 
     public RotatingMapTrigger(Map conf, String name, DisruptorQueue queue) {
         this.name = name;
         this.queue = queue;
         this.opCode = TimerConstants.ROTATING_MAP;
 
-        int msgTimeOut =
-                JStormUtils.parseInt(
-                        conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 30);
+        int msgTimeOut = JStormUtils.parseInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 30);
         frequence = (msgTimeOut) / (Acker.TIMEOUT_BUCKET_NUM - 1);
         if (frequence <= 0) {
             frequence = 1;
         }
 
-        firstTime =
-                JStormUtils.parseInt(
-                        conf.get(Config.SUPERVISOR_WORKER_START_TIMEOUT_SECS),
-                        120);
+        firstTime = JStormUtils.parseInt(conf.get(Config.SUPERVISOR_WORKER_START_TIMEOUT_SECS), 120);
 
         firstTime += frequence;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchCheckTrigger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchCheckTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchCheckTrigger.java
new file mode 100644
index 0000000..0e00f66
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchCheckTrigger.java
@@ -0,0 +1,32 @@
+package com.alibaba.jstorm.daemon.worker.timer;
+
+import org.apache.log4j.Logger;
+
+import com.alibaba.jstorm.task.TaskBatchTransfer;
+
+public class TaskBatchCheckTrigger extends TimerTrigger {
+    private static final Logger LOG = Logger.getLogger(TickTupleTrigger.class);
+
+    private TaskBatchTransfer batchTransfer;
+
+    public TaskBatchCheckTrigger(int frequence, String name, TaskBatchTransfer transfer) {
+        if (frequence <= 0) {
+            LOG.warn(" The frequence of " + name + " is invalid");
+            frequence = 1;
+        }
+        this.firstTime = frequence;
+        this.frequence = frequence;
+        this.batchTransfer = transfer;
+    }
+
+    @Override
+    public void run() {
+        try {
+            batchTransfer.startCheck();
+        } catch (Exception e) {
+            LOG.warn("Failed to public timer event to " + name, e);
+            return;
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchFlushTrigger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchFlushTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchFlushTrigger.java
index 3a5353f..96165d0 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchFlushTrigger.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchFlushTrigger.java
@@ -22,9 +22,9 @@ import org.slf4j.LoggerFactory;
 
 import com.alibaba.jstorm.task.TaskBatchTransfer;
 
-public class TaskBatchFlushTrigger extends TimerTrigger{
+public class TaskBatchFlushTrigger extends TimerTrigger {
     private static final Logger LOG = LoggerFactory.getLogger(TickTupleTrigger.class);
-    
+
     private TaskBatchTransfer batchTransfer;
 
     public TaskBatchFlushTrigger(int frequence, String name, TaskBatchTransfer transfer) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskHeartbeatTrigger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskHeartbeatTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskHeartbeatTrigger.java
index 0b67776..ad81a2b 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskHeartbeatTrigger.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskHeartbeatTrigger.java
@@ -17,6 +17,7 @@
  */
 package com.alibaba.jstorm.daemon.worker.timer;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 
@@ -24,33 +25,66 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import backtype.storm.Config;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleExt;
+import backtype.storm.tuple.TupleImplExt;
+import backtype.storm.tuple.Values;
 import backtype.storm.utils.DisruptorQueue;
 
-import com.alibaba.jstorm.daemon.worker.timer.TimerTrigger.TimerEvent;
+import com.alibaba.jstorm.cluster.Common;
+import com.alibaba.jstorm.task.error.ITaskReportErr;
+import com.alibaba.jstorm.task.UptimeComputer;
+import com.alibaba.jstorm.utils.IntervalCheck;
 import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.TimeUtils;
 
 public class TaskHeartbeatTrigger extends TimerTrigger {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(TaskHeartbeatTrigger.class);
+    private static final Logger LOG = LoggerFactory.getLogger(TaskHeartbeatTrigger.class);
 
     private int taskId;
-    
-    private BlockingQueue<Object> controlQueue;
+    private String componentId;
+    private TopologyContext sysTopologyCtx;
 
-    public TaskHeartbeatTrigger(Map conf, String name, DisruptorQueue queue,
-            BlockingQueue<Object> controlQueue, int taskId) {
+    private BlockingQueue<Object> controlQueue = null;
+
+    private OutputCollector boltOutputCollector = null;
+    private SpoutOutputCollector spoutOutputCollector = null;
+
+    private long executeThreadHbTime;
+    private int taskHbTimeout;
+
+    private ITaskReportErr reportError;
+
+    private IntervalCheck intervalCheck;
+
+    private UptimeComputer uptime;
+
+    public TaskHeartbeatTrigger(Map conf, String name, DisruptorQueue queue, BlockingQueue<Object> controlQueue, int taskId, String componentId,
+            TopologyContext sysTopologyCtx, ITaskReportErr reportError) {
         this.name = name;
         this.queue = queue;
         this.controlQueue = controlQueue;
         this.opCode = TimerConstants.TASK_HEARTBEAT;
 
         this.taskId = taskId;
+        this.componentId = componentId;
+        this.sysTopologyCtx = sysTopologyCtx;
+
+        this.frequence = JStormUtils.parseInt(conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS), 10);
+        this.firstTime = frequence;
 
-        frequence =
-                JStormUtils.parseInt(
-                        conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS), 10);
+        this.executeThreadHbTime = TimeUtils.current_time_secs();
+        this.taskHbTimeout = JStormUtils.parseInt(conf.get(Config.NIMBUS_TASK_TIMEOUT_SECS), 180);
+        this.intervalCheck = new IntervalCheck();
+        this.intervalCheck.setInterval(taskHbTimeout);
+        this.intervalCheck.start();
 
-        firstTime = frequence;
+        this.reportError = reportError;
+
+        this.uptime = new UptimeComputer();
     }
 
     @Override
@@ -60,7 +94,6 @@ public class TaskHeartbeatTrigger extends TimerTrigger {
 
     @Override
     public void run() {
-
         try {
             updateObject();
 
@@ -69,16 +102,63 @@ public class TaskHeartbeatTrigger extends TimerTrigger {
                 return;
             }
 
+            if (intervalCheck.check()) {
+                checkExecuteThreadHb();
+            }
+
+            if (componentId.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) {
+                Values values = new Values(uptime.uptime());
+                TupleExt tuple = new TupleImplExt(sysTopologyCtx, values, taskId, Common.TOPOLOGY_MASTER_HB_STREAM_ID);
+                queue.publish(tuple);
+            } else {
+                // Send task heartbeat to topology master
+                sendHbMsg();
+            }
+
+            // Send message used to monitor execute thread 
             TimerEvent event = new TimerEvent(opCode, object);
-            
-            controlQueue.offer(event);
-            LOG.debug("Offer task HB event to controlQueue, taskId=" + taskId);
+            boolean ret = controlQueue.offer(event);
+            if (ret)
+                LOG.debug("Offer task HB event to controlQueue, taskId=" + taskId);
+            else
+                LOG.debug("Failed to offer task HB event to controlQueue, taskId=" + taskId);
         } catch (Exception e) {
-            LOG.warn("Failed to public timer event to " + name, e);
+            LOG.warn("Failed to publish timer event to " + name, e);
             return;
         }
 
         LOG.debug(" Trigger timer event to " + name);
 
     }
+
+    public void setSpoutOutputCollector(SpoutOutputCollector spoutOutputCollector) {
+        this.spoutOutputCollector = spoutOutputCollector;
+    }
+
+    public void setBoltOutputCollector(OutputCollector boltOutputCollector) {
+        this.boltOutputCollector = boltOutputCollector;
+    }
+
+    public void setExeThreadHbTime(long hbTime) {
+        this.executeThreadHbTime = hbTime;
+    }
+
+    private void sendHbMsg() {
+        List values = JStormUtils.mk_list(uptime.uptime());
+        if (spoutOutputCollector != null) {
+            spoutOutputCollector.emit(Common.TOPOLOGY_MASTER_HB_STREAM_ID, values);
+        } else if (boltOutputCollector != null) {
+            boltOutputCollector.emit(Common.TOPOLOGY_MASTER_HB_STREAM_ID, values);
+        } else {
+            LOG.warn("Failed to send hearbeat msg. OutputCollector has not been initialized!");
+        }
+    }
+
+    private void checkExecuteThreadHb() {
+        long currentTime = TimeUtils.current_time_secs();
+        if (currentTime - executeThreadHbTime > taskHbTimeout) {
+            String error = "No response from Task-" + taskId + ", last report time(sec) is " + executeThreadHbTime;
+            reportError.report(error);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TickTupleTrigger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TickTupleTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TickTupleTrigger.java
index ecf01c5..a70d8ab 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TickTupleTrigger.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TickTupleTrigger.java
@@ -29,13 +29,11 @@ import backtype.storm.utils.DisruptorQueue;
 import com.alibaba.jstorm.utils.TimeUtils;
 
 public class TickTupleTrigger extends TimerTrigger {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(TickTupleTrigger.class);
+    private static final Logger LOG = LoggerFactory.getLogger(TickTupleTrigger.class);
 
     TopologyContext topologyContext;
 
-    public TickTupleTrigger(TopologyContext topologyContext, int frequence,
-            String name, DisruptorQueue queue) {
+    public TickTupleTrigger(TopologyContext topologyContext, int frequence, String name, DisruptorQueue queue) {
         this.name = name;
         this.queue = queue;
         this.opCode = TimerConstants.TICK_TUPLE;
@@ -53,10 +51,7 @@ public class TickTupleTrigger extends TimerTrigger {
     @Override
     public void updateObject() {
         this.object =
-                new TupleImplExt(topologyContext, new Values(
-                        TimeUtils.current_time_secs()),
-                        (int) Constants.SYSTEM_TASK_ID,
-                        Constants.SYSTEM_TICK_STREAM_ID);
+                new TupleImplExt(topologyContext, new Values(TimeUtils.current_time_secs()), (int) Constants.SYSTEM_TASK_ID, Constants.SYSTEM_TICK_STREAM_ID);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TimerTrigger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TimerTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TimerTrigger.java
index 4cecbea..2c2c39c 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TimerTrigger.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TimerTrigger.java
@@ -30,13 +30,11 @@ import backtype.storm.utils.DisruptorQueue;
 import com.lmax.disruptor.InsufficientCapacityException;
 
 public class TimerTrigger implements Runnable {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(TimerTrigger.class);
+    private static final Logger LOG = LoggerFactory.getLogger(TimerTrigger.class);
 
     private static ScheduledExecutorService threadPool;
 
-    public static void setScheduledExecutorService(
-            ScheduledExecutorService scheduledExecutorService) {
+    public static void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
         threadPool = scheduledExecutorService;
     }
 
@@ -44,7 +42,7 @@ public class TimerTrigger implements Runnable {
     protected int opCode;
     protected int firstTime;
     protected int frequence;
-    protected DisruptorQueue queue;
+    protected DisruptorQueue queue = null;
     protected Object object;
     protected boolean block = true;
 
@@ -53,8 +51,7 @@ public class TimerTrigger implements Runnable {
     }
 
     public void register(TimeUnit timeUnit) {
-        threadPool.scheduleAtFixedRate(this, firstTime, frequence,
-                timeUnit);
+        threadPool.scheduleAtFixedRate(this, firstTime, frequence, timeUnit);
         LOG.info("Successfully register timer " + this);
     }
 
@@ -145,8 +142,7 @@ public class TimerTrigger implements Runnable {
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
 
     public class TimerEvent {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/ClearThread.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/ClearThread.java b/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/ClearThread.java
index 5f16b0b..4d7d32c 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/ClearThread.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/ClearThread.java
@@ -31,8 +31,7 @@ import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.TimeUtils;
 
 public class ClearThread extends RunnableCallback {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(ClearThread.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ClearThread.class);
 
     private final int REQUEST_TIMEOUT_SECS;
     private static final int TIMEOUT_CHECK_SECS = 5;
@@ -42,10 +41,7 @@ public class ClearThread extends RunnableCallback {
     public ClearThread(Drpc drpc) {
         drpcService = drpc;
 
-        REQUEST_TIMEOUT_SECS =
-                JStormUtils.parseInt(
-                        drpcService.getConf().get(
-                                Config.DRPC_REQUEST_TIMEOUT_SECS), 60);
+        REQUEST_TIMEOUT_SECS = JStormUtils.parseInt(drpcService.getConf().get(Config.DRPC_REQUEST_TIMEOUT_SECS), 60);
         LOG.info("Drpc timeout seconds is " + REQUEST_TIMEOUT_SECS);
     }
 
@@ -56,8 +52,7 @@ public class ClearThread extends RunnableCallback {
             if (TimeUtils.time_delta(e.getValue()) > REQUEST_TIMEOUT_SECS) {
                 String id = e.getKey();
 
-                drpcService.getIdtoResult().put(id,
-                        new DRPCExecutionException("Request timed out"));
+                drpcService.getIdtoResult().put(id, new DRPCExecutionException("Request timed out"));
                 Semaphore s = drpcService.getIdtoSem().get(id);
                 if (s != null) {
                     s.release();

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/Drpc.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/Drpc.java b/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/Drpc.java
index 09b4885..adbec06 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/Drpc.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/Drpc.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.alibaba.jstorm.utils.JStormServerUtils;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.server.THsHaServer;
@@ -53,8 +54,7 @@ import com.alibaba.jstorm.utils.TimeUtils;
  * @author yannian
  * 
  */
-public class Drpc implements DistributedRPC.Iface,
-        DistributedRPCInvocations.Iface, Shutdownable {
+public class Drpc implements DistributedRPC.Iface, DistributedRPCInvocations.Iface, Shutdownable {
 
     private static final Logger LOG = LoggerFactory.getLogger(Drpc.class);
 
@@ -76,23 +76,18 @@ public class Drpc implements DistributedRPC.Iface,
 
     private AtomicBoolean shutdown = new AtomicBoolean(false);
 
-    private THsHaServer initHandlerServer(Map conf, final Drpc service)
-            throws Exception {
+    private THsHaServer initHandlerServer(Map conf, final Drpc service) throws Exception {
         int port = JStormUtils.parseInt(conf.get(Config.DRPC_PORT));
-        int workerThreadNum =
-                JStormUtils.parseInt(conf.get(Config.DRPC_WORKER_THREADS));
+        int workerThreadNum = JStormUtils.parseInt(conf.get(Config.DRPC_WORKER_THREADS));
         int queueSize = JStormUtils.parseInt(conf.get(Config.DRPC_QUEUE_SIZE));
 
         TNonblockingServerSocket socket = new TNonblockingServerSocket(port);
         THsHaServer.Args targs = new THsHaServer.Args(socket);
         targs.workerThreads(64);
         targs.protocolFactory(new TBinaryProtocol.Factory());
-        targs.processor(new DistributedRPC.Processor<DistributedRPC.Iface>(
-                service));
+        targs.processor(new DistributedRPC.Processor<DistributedRPC.Iface>(service));
 
-        ThreadPoolExecutor executor =
-                new ThreadPoolExecutor(workerThreadNum, workerThreadNum, 60,
-                        TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize));
+        ThreadPoolExecutor executor = new ThreadPoolExecutor(workerThreadNum, workerThreadNum, 60, TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize));
         targs.executorService(executor);
 
         THsHaServer handlerServer = new THsHaServer(targs);
@@ -101,17 +96,14 @@ public class Drpc implements DistributedRPC.Iface,
         return handlerServer;
     }
 
-    private THsHaServer initInvokeServer(Map conf, final Drpc service)
-            throws Exception {
+    private THsHaServer initInvokeServer(Map conf, final Drpc service) throws Exception {
         int port = JStormUtils.parseInt(conf.get(Config.DRPC_INVOCATIONS_PORT));
 
         TNonblockingServerSocket socket = new TNonblockingServerSocket(port);
         THsHaServer.Args targsInvoke = new THsHaServer.Args(socket);
         targsInvoke.workerThreads(64);
         targsInvoke.protocolFactory(new TBinaryProtocol.Factory());
-        targsInvoke
-                .processor(new DistributedRPCInvocations.Processor<DistributedRPCInvocations.Iface>(
-                        service));
+        targsInvoke.processor(new DistributedRPCInvocations.Processor<DistributedRPCInvocations.Iface>(service));
 
         THsHaServer invokeServer = new THsHaServer(targsInvoke);
 
@@ -136,6 +128,7 @@ public class Drpc implements DistributedRPC.Iface,
 
         LOG.info("Starting Distributed RPC servers...");
         new Thread(new Runnable() {
+
             @Override
             public void run() {
                 invokeServer.serve();
@@ -148,11 +141,18 @@ public class Drpc implements DistributedRPC.Iface,
         clearThread = new AsyncLoopThread(new ClearThread(this));
         LOG.info("Successfully start clear thread");
     }
+    private void createPid(Map conf) throws Exception {
+        String pidDir = StormConfig.drpcPids(conf);
+
+        JStormServerUtils.createPid(pidDir);
+    }
 
     public void init() throws Exception {
         conf = StormConfig.read_storm_config();
         LOG.info("Configuration is \n" + conf);
 
+        createPid(conf);
+
         initClearThread();
 
         initThrift();
@@ -188,14 +188,10 @@ public class Drpc implements DistributedRPC.Iface,
     }
 
     private AtomicInteger ctr = new AtomicInteger(0);
-    private ConcurrentHashMap<String, Semaphore> idtoSem =
-            new ConcurrentHashMap<String, Semaphore>();
-    private ConcurrentHashMap<String, Object> idtoResult =
-            new ConcurrentHashMap<String, Object>();
-    private ConcurrentHashMap<String, Integer> idtoStart =
-            new ConcurrentHashMap<String, Integer>();
-    private ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>> requestQueues =
-            new ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>>();
+    private ConcurrentHashMap<String, Semaphore> idtoSem = new ConcurrentHashMap<String, Semaphore>();
+    private ConcurrentHashMap<String, Object> idtoResult = new ConcurrentHashMap<String, Object>();
+    private ConcurrentHashMap<String, Integer> idtoStart = new ConcurrentHashMap<String, Integer>();
+    private ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>> requestQueues = new ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>>();
 
     public void cleanup(String id) {
         LOG.info("clean id " + id + " @ " + (System.currentTimeMillis()));
@@ -206,10 +202,8 @@ public class Drpc implements DistributedRPC.Iface,
     }
 
     @Override
-    public String execute(String function, String args)
-            throws DRPCExecutionException, TException {
-        LOG.info("Received DRPC request for " + function + " " + args + " at "
-                + (System.currentTimeMillis()));
+    public String execute(String function, String args) throws DRPCExecutionException, TException {
+        LOG.info("Received DRPC request for " + function + " " + args + " at " + (System.currentTimeMillis()));
         int idinc = this.ctr.incrementAndGet();
         int maxvalue = 1000000000;
         int newid = idinc % maxvalue;
@@ -225,19 +219,16 @@ public class Drpc implements DistributedRPC.Iface,
         this.idtoSem.put(strid, sem);
         ConcurrentLinkedQueue<DRPCRequest> queue = acquireQueue(function);
         queue.add(req);
-        LOG.info("Waiting for DRPC request for " + function + " " + args
-                + " at " + (System.currentTimeMillis()));
+        LOG.info("Waiting for DRPC request for " + function + " " + args + " at " + (System.currentTimeMillis()));
         try {
             sem.acquire();
         } catch (InterruptedException e) {
             LOG.error("acquire fail ", e);
         }
-        LOG.info("Acquired for DRPC request for " + function + " " + args
-                + " at " + (System.currentTimeMillis()));
+        LOG.info("Acquired for DRPC request for " + function + " " + args + " at " + (System.currentTimeMillis()));
 
         Object result = this.idtoResult.get(strid);
-        LOG.info("Returning for DRPC request for " + function + " " + args
-                + " at " + (System.currentTimeMillis()));
+        LOG.info("Returning for DRPC request for " + function + " " + args + " at " + (System.currentTimeMillis()));
 
         this.cleanup(strid);
 
@@ -250,8 +241,7 @@ public class Drpc implements DistributedRPC.Iface,
     @Override
     public void result(String id, String result) throws TException {
         Semaphore sem = this.idtoSem.get(id);
-        LOG.info("Received result " + result + " for id " + id + " at "
-                + (System.currentTimeMillis()));
+        LOG.info("Received result " + result + " for id " + id + " at " + (System.currentTimeMillis()));
         if (sem != null) {
             this.idtoResult.put(id, result);
             sem.release();
@@ -265,8 +255,7 @@ public class Drpc implements DistributedRPC.Iface,
         ConcurrentLinkedQueue<DRPCRequest> queue = acquireQueue(functionName);
         DRPCRequest req = queue.poll();
         if (req != null) {
-            LOG.info("Fetched request for " + functionName + " at "
-                    + (System.currentTimeMillis()));
+            LOG.info("Fetched request for " + functionName + " at " + (System.currentTimeMillis()));
             return req;
         } else {
             return new DRPCRequest("", "");
@@ -277,18 +266,15 @@ public class Drpc implements DistributedRPC.Iface,
     @Override
     public void failRequest(String id) throws TException {
         Semaphore sem = this.idtoSem.get(id);
-        LOG.info("failRequest result  for id " + id + " at "
-                + (System.currentTimeMillis()));
+        LOG.info("failRequest result  for id " + id + " at " + (System.currentTimeMillis()));
         if (sem != null) {
-            this.idtoResult.put(id,
-                    new DRPCExecutionException("Request failed"));
+            this.idtoResult.put(id, new DRPCExecutionException("Request failed"));
             sem.release();
         }
     }
 
     private ConcurrentLinkedQueue<DRPCRequest> acquireQueue(String function) {
-        ConcurrentLinkedQueue<DRPCRequest> reqQueue =
-                requestQueues.get(function);
+        ConcurrentLinkedQueue<DRPCRequest> reqQueue = requestQueues.get(function);
         if (reqQueue == null) {
             reqQueue = new ConcurrentLinkedQueue<DRPCRequest>();
             requestQueues.put(function, reqQueue);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerImp.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerImp.java b/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerImp.java
index cdfa1cc..2ec7e20 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerImp.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerImp.java
@@ -29,14 +29,12 @@ import com.alibaba.jstorm.callback.RunnableCallback;
  * Event Manager, drop one event from queue, then execute the event.
  */
 public class EventManagerImp extends RunnableCallback implements EventManager {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(EventManagerImp.class);
+    private static final Logger LOG = LoggerFactory.getLogger(EventManagerImp.class);
 
     private AtomicInteger added = new AtomicInteger();
     private AtomicInteger processed = new AtomicInteger();
 
-    private LinkedBlockingQueue<RunnableCallback> queue =
-            new LinkedBlockingQueue<RunnableCallback>();
+    private LinkedBlockingQueue<RunnableCallback> queue = new LinkedBlockingQueue<RunnableCallback>();
 
     private Exception e;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerPusher.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerPusher.java b/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerPusher.java
index 7f5e9ef..e4bf9e2 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerPusher.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerPusher.java
@@ -30,8 +30,7 @@ public class EventManagerPusher extends RunnableCallback {
 
     private int frequence;
 
-    public EventManagerPusher(EventManager eventManager,
-            RunnableCallback event, int frequence) {
+    public EventManagerPusher(EventManager eventManager, RunnableCallback event, int frequence) {
         this.eventManager = eventManager;
         this.event = event;
         this.frequence = frequence;

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ControlMessage.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ControlMessage.java
index d68347a..a83f07f 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ControlMessage.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ControlMessage.java
@@ -24,11 +24,10 @@ import org.jboss.netty.buffer.ChannelBuffers;
 public enum ControlMessage {
     EOB_MESSAGE((short) -201), OK_RESPONSE((short) -200);
 
-
     private short code;
     private long timeStamp;
     protected static int port;
-    
+
     static public void setPort(int port) {
         ControlMessage.port = port;
     }
@@ -62,9 +61,7 @@ public enum ControlMessage {
      * @throws Exception
      */
     ChannelBuffer buffer() throws Exception {
-        ChannelBufferOutputStream bout =
-                new ChannelBufferOutputStream(
-                        ChannelBuffers.directBuffer(encodeLength()));
+        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));
         write(bout);
         bout.close();
         return bout.buffer();

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageBatch.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageBatch.java
index 172822d..45d6600 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageBatch.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageBatch.java
@@ -28,8 +28,7 @@ import org.slf4j.LoggerFactory;
 import backtype.storm.messaging.TaskMessage;
 
 class MessageBatch {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(MessageBatch.class);
+    private static final Logger LOG = LoggerFactory.getLogger(MessageBatch.class);
     private int buffer_size;
     private ArrayList<Object> msgs;
     private int encoded_length;
@@ -58,8 +57,7 @@ class MessageBatch {
             return;
         }
 
-        throw new RuntimeException("Unsuppoted object type "
-                + obj.getClass().getName());
+        throw new RuntimeException("Unsuppoted object type " + obj.getClass().getName());
     }
 
     void remove(Object obj) {
@@ -89,8 +87,7 @@ class MessageBatch {
      * try to add a TaskMessage to a batch
      * 
      * @param taskMsg
-     * @return false if the msg could not be added due to buffer size limit;
-     *         true otherwise
+     * @return false if the msg could not be added due to buffer size limit; true otherwise
      */
     boolean tryAdd(TaskMessage taskMsg) {
         if ((encoded_length + msgEncodeLength(taskMsg)) > buffer_size)
@@ -144,9 +141,7 @@ class MessageBatch {
      * create a buffer containing the encoding of this batch
      */
     ChannelBuffer buffer() throws Exception {
-        ChannelBufferOutputStream bout =
-                new ChannelBufferOutputStream(
-                        ChannelBuffers.directBuffer(encoded_length));
+        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
 
         for (Object msg : msgs)
             if (msg instanceof TaskMessage)
@@ -168,19 +163,16 @@ class MessageBatch {
     /**
      * write a TaskMessage into a stream
      * 
-     * Each TaskMessage is encoded as: task ... short(2) len ... int(4) payload
-     * ... byte[] *
+     * Each TaskMessage is encoded as: task ... short(2) len ... int(4) payload ... byte[] *
      */
-    private void writeTaskMessage(ChannelBufferOutputStream bout,
-            TaskMessage message) throws Exception {
+    private void writeTaskMessage(ChannelBufferOutputStream bout, TaskMessage message) throws Exception {
         int payload_len = 0;
         if (message.message() != null)
             payload_len = message.message().length;
 
         int task_id = message.task();
         if (task_id > Short.MAX_VALUE)
-            throw new RuntimeException("Task ID should not exceed "
-                    + Short.MAX_VALUE);
+            throw new RuntimeException("Task ID should not exceed " + Short.MAX_VALUE);
 
         bout.writeShort((short) task_id);
         bout.writeInt(payload_len);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageDecoder.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageDecoder.java
index b147092..38e7930 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageDecoder.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageDecoder.java
@@ -17,11 +17,17 @@
  */
 package com.alibaba.jstorm.message.netty;
 
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
+import backtype.storm.messaging.TaskMessage;
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.common.metric.AsmHistogram;
+import com.alibaba.jstorm.common.metric.AsmMeter;
+import com.alibaba.jstorm.common.metric.AsmMetric;
+import com.alibaba.jstorm.metric.JStormMetrics;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.metric.MetricUtils;
+import com.alibaba.jstorm.metric.MetricDef;
+import com.alibaba.jstorm.utils.NetWorkUtils;
+import com.alibaba.jstorm.utils.TimeUtils;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelHandlerContext;
@@ -29,46 +35,40 @@ import org.jboss.netty.handler.codec.frame.FrameDecoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.common.metric.Histogram;
-import com.alibaba.jstorm.common.metric.Meter;
-import com.alibaba.jstorm.metric.JStormMetrics;
-import com.alibaba.jstorm.metric.MetricDef;
-import com.alibaba.jstorm.utils.NetWorkUtils;
-
-import backtype.storm.messaging.TaskMessage;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
 
 public class MessageDecoder extends FrameDecoder {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(MessageDecoder.class);
+    private static final Logger LOG = LoggerFactory.getLogger(MessageDecoder.class);
 
     // here doesn't use Timer due to competition
-    private static Histogram timer = JStormMetrics
-            .registerWorkerHistogram(MetricDef.NETWORK_MSG_DECODE_TIME);
-    private static Meter recvSpeed = JStormMetrics
-            .registerWorkerMeter(MetricDef.NETTY_SRV_RECV_SPEED);
-    private static Map<Channel, Histogram> networkTransmitTimeMap =
-            new HashMap<Channel, Histogram>();
-    private static Map<Channel, String> transmitNameMap =
-            new HashMap<Channel, String>();
+
+    private static AsmHistogram msgDecodeTime = (AsmHistogram) JStormMetrics.registerWorkerMetric(
+            MetricUtils.workerMetricName(MetricDef.NETWORK_MSG_DECODE_TIME, MetricType.HISTOGRAM), new AsmHistogram());
+    private static AsmMeter recvSpeed = (AsmMeter) JStormMetrics.registerWorkerMetric(
+            MetricUtils.workerMetricName(MetricDef.NETTY_SRV_RECV_SPEED, MetricType.METER), new AsmMeter());
+
+    private static Map<Channel, AsmHistogram> networkTransmitTimeMap = new HashMap<Channel, AsmHistogram>();
+    private static Map<Channel, String> transmitNameMap = new HashMap<Channel, String>();
     private boolean isServer;
     private String localIp;
     private int localPort;
 
+    private boolean enableTransitTimeMetrics;
+
     public MessageDecoder(boolean isServer, Map conf) {
         this.isServer = isServer;
         this.localPort = ConfigExtension.getLocalWorkerPort(conf);
         this.localIp = NetWorkUtils.ip();
-
+        this.enableTransitTimeMetrics = MetricUtils.isEnableNettyMetrics(conf);
     }
 
     /*
-     * Each ControlMessage is encoded as: code (<0) ... short(2) Each
-     * TaskMessage is encoded as: task (>=0) ... short(2) len ... int(4) payload
-     * ... byte[] *
+     * Each ControlMessage is encoded as: code (<0) ... short(2) Each TaskMessage is encoded as: task (>=0) ... short(2) len ... int(4) payload ... byte[] *
      */
-    protected Object decode(ChannelHandlerContext ctx, Channel channel,
-            ChannelBuffer buf) throws Exception {
+    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
         // Make sure that we have received at least a short
         long available = buf.readableBytes();
         // Length of control message is 10.
@@ -106,21 +106,20 @@ public class MessageDecoder extends FrameDecoder {
                 available -= 12;
                 if (ctrl_msg == ControlMessage.EOB_MESSAGE) {
 
+
                     long interval = System.currentTimeMillis() - timeStamp;
-                    if (interval > 0) {
-
-	                    Histogram netTransTime =
-	                            getTransmitHistogram(channel, clientPort);
-	                    if (netTransTime != null) {
-	                        netTransTime.update(interval );
-	
-	                    }
+                    if (interval < 0)
+                        interval = 0;
+
+                    if(enableTransitTimeMetrics) {
+                        AsmHistogram netTransTime = getTransmitHistogram(channel, clientPort);
+                        if (netTransTime != null) {
+                            netTransTime.update(interval * TimeUtils.NS_PER_US);
+                        }
                     }
-
-                    recvSpeed.update(Double.valueOf(ControlMessage
-                            .encodeLength()));
                 }
 
+                recvSpeed.update(ControlMessage.encodeLength());
                 return ctrl_msg;
             }
 
@@ -138,9 +137,7 @@ public class MessageDecoder extends FrameDecoder {
             // Read the length field.
             int length = buf.readInt();
             if (length <= 0) {
-                LOG.info(
-                        "Receive one message whose TaskMessage's message length is {}",
-                        length);
+                LOG.info("Receive one message whose TaskMessage's message length is {}", length);
                 return new TaskMessage(task, null);
             }
 
@@ -165,72 +162,55 @@ public class MessageDecoder extends FrameDecoder {
             // task, length, JStormUtils.toPrintableString(rawBytes));
 
             TaskMessage ret = new TaskMessage(task, rawBytes);
-            recvSpeed.update(Double.valueOf(rawBytes.length + 6));
+            recvSpeed.update(rawBytes.length + 6);
             return ret;
         } finally {
             if (isServer) {
                 Long endTime = System.nanoTime();
-                timer.update((endTime - startTime) / 1000000.0d);
+                msgDecodeTime.update((endTime - startTime) / TimeUtils.NS_PER_US);
             }
         }
 
     }
 
-    public Histogram getTransmitHistogram(Channel channel, int clientPort) {
-        Histogram netTransTime = networkTransmitTimeMap.get(channel);
+    public AsmHistogram getTransmitHistogram(Channel channel, int clientPort) {
+        AsmHistogram netTransTime = networkTransmitTimeMap.get(channel);
         if (netTransTime == null) {
+            InetSocketAddress sockAddr = (InetSocketAddress) (channel.getRemoteAddress());
+
+            String nettyConnection = NettyConnection.mkString(sockAddr.getAddress().getHostAddress(), clientPort, localIp, localPort);
+            netTransTime =
+                    (AsmHistogram) JStormMetrics.registerNettyMetric(
+                            MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_SRV_MSG_TRANS_TIME, nettyConnection), MetricType.HISTOGRAM),
+                            new AsmHistogram());
 
-            InetSocketAddress sockAddr =
-                    (InetSocketAddress) (channel.getRemoteAddress());
-
-            String nettyConnection =
-                    NettyConnection.mkString(sockAddr.getAddress()
-                            .getHostAddress(), clientPort, localIp, localPort);
-            try {
-                netTransTime =
-                        JStormMetrics.registerWorkerHistogram(
-                                MetricDef.NETTY_SRV_MSG_TRANS_TIME,
-                                nettyConnection);
-            } catch (Exception e) {
-                LOG.error("{}.{} has been register",
-                        MetricDef.NETTY_SRV_MSG_TRANS_TIME, nettyConnection);
-                removeTransmitHistogram(nettyConnection);
-                return null;
-            }
             networkTransmitTimeMap.put(channel, netTransTime);
             transmitNameMap.put(channel, nettyConnection);
-            LOG.info("Register Transmit Histogram of {}, channel {}",
-                    nettyConnection, channel);
+            LOG.info("Register Transmit Histogram of {}, channel {}", nettyConnection, channel);
         }
 
         return netTransTime;
     }
 
     public static void removeTransmitHistogram(Channel channel) {
-        Histogram netTransTime = networkTransmitTimeMap.remove(channel);
+        AsmHistogram netTransTime = networkTransmitTimeMap.remove(channel);
         if (netTransTime != null) {
-
             String nettyConnection = transmitNameMap.remove(channel);
-
-            JStormMetrics.unregisterWorkerMetric(
-                    MetricDef.NETTY_SRV_MSG_TRANS_TIME, nettyConnection);
-
-            LOG.info("Remove Transmit Histogram of {}, channel {}",
-                    nettyConnection, channel);
+            JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_SRV_MSG_TRANS_TIME, nettyConnection),
+                    MetricType.HISTOGRAM));
+            LOG.info("Remove Transmit Histogram of {}, channel {}", nettyConnection, channel);
         }
     }
-    
+
     public static void removeTransmitHistogram(String nettyConnection) {
         Channel oldChannel = null;
-        
-        for (Entry<Channel, String> entry: transmitNameMap.entrySet()) {
+
+        for (Entry<Channel, String> entry : transmitNameMap.entrySet()) {
             if (nettyConnection.equals(entry.getValue())) {
                 oldChannel = entry.getKey();
             }
         }
-        
+
         removeTransmitHistogram(oldChannel);
     }
-    
-    
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageEncoder.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageEncoder.java
index 61e9187..0a750e0 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageEncoder.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageEncoder.java
@@ -23,8 +23,7 @@ import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
 
 public class MessageEncoder extends OneToOneEncoder {
     @Override
-    protected Object encode(ChannelHandlerContext ctx, Channel channel,
-            Object obj) throws Exception {
+    protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception {
         if (obj instanceof ControlMessage) {
             return ((ControlMessage) obj).buffer();
         }
@@ -33,8 +32,7 @@ public class MessageEncoder extends OneToOneEncoder {
             return ((MessageBatch) obj).buffer();
         }
 
-        throw new RuntimeException("Unsupported encoding of object of class "
-                + obj.getClass().getName());
+        throw new RuntimeException("Unsupported encoding of object of class " + obj.getClass().getName());
     }
 
 }