You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:42 UTC

[04/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TaskSendTargets.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TaskSendTargets.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TaskSendTargets.java
index 8a55ec5..9b86458 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TaskSendTargets.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TaskSendTargets.java
@@ -35,8 +35,7 @@ import com.alibaba.jstorm.utils.JStormUtils;
 
 /**
  * 
- * tuple sending object, which get which task should tuple be send to, and
- * update statics
+ * tuple sending object, which get which task should tuple be send to, and update statics
  * 
  * @author yannian/Longda
  * 
@@ -58,8 +57,7 @@ public class TaskSendTargets {
     private boolean isDebuging = false;
     private String debugIdStr;
 
-    public TaskSendTargets(Map<Object, Object> _storm_conf, String _component,
-            Map<String, Map<String, MkGrouper>> _stream_component_grouper,
+    public TaskSendTargets(Map<Object, Object> _storm_conf, String _component, Map<String, Map<String, MkGrouper>> _stream_component_grouper,
             TopologyContext _topology_context, TaskBaseMetric _task_stats) {
         this.stormConf = _storm_conf;
         this.componentId = _component;
@@ -67,17 +65,14 @@ public class TaskSendTargets {
         this.topologyContext = _topology_context;
         this.taskStats = _task_stats;
 
-        isDebuging =
-                JStormUtils.parseBoolean(stormConf.get(Config.TOPOLOGY_DEBUG),
-                        false);
+        isDebuging = JStormUtils.parseBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false);
 
         taskId = topologyContext.getThisTaskId();
         debugIdStr = " Emit from " + componentId + ":" + taskId + " ";
     }
 
     // direct send tuple to special task
-    public java.util.List<Integer> get(Integer out_task_id, String stream,
-            List<Object> tuple) {
+    public List<Integer> get(Integer out_task_id, String stream, List<Object> tuple) {
 
         // in order to improve acker's speed, skip checking
         // String target_component =
@@ -92,29 +87,26 @@ public class TaskSendTargets {
         // }
 
         if (isDebuging) {
-            LOG.info(debugIdStr + stream + " to " + out_task_id + ":"
-                    + tuple.toString());
+            LOG.info(debugIdStr + stream + " to " + out_task_id + ":" + tuple.toString());
         }
 
         taskStats.send_tuple(stream, 1);
 
-        java.util.List<Integer> out_tasks = new ArrayList<Integer>();
+        List<Integer> out_tasks = new ArrayList<Integer>();
         out_tasks.add(out_task_id);
         return out_tasks;
     }
 
     // send tuple according to grouping
-    public java.util.List<Integer> get(String stream, List<Object> tuple) {
-        java.util.List<Integer> out_tasks = new ArrayList<Integer>();
+    public List<Integer> get(String stream, List<Object> tuple) {
+        List<Integer> out_tasks = new ArrayList<Integer>();
 
         // get grouper, then get which task should tuple be sent to.
-        Map<String, MkGrouper> componentCrouping =
-                streamComponentgrouper.get(stream);
+        Map<String, MkGrouper> componentCrouping = streamComponentgrouper.get(stream);
         if (componentCrouping == null) {
             // if the target component's parallelism is 0, don't need send to
             // them
-            LOG.debug("Failed to get Grouper of " + stream + " in "
-                    + debugIdStr);
+            LOG.debug("Failed to get Grouper of " + stream + " in " + debugIdStr);
             return out_tasks;
         }
 
@@ -123,8 +115,7 @@ public class TaskSendTargets {
             MkGrouper g = ee.getValue();
 
             if (GrouperType.direct.equals(g.gettype())) {
-                throw new IllegalArgumentException(
-                        "Cannot do regular emit to direct stream");
+                throw new IllegalArgumentException("Cannot do regular emit to direct stream");
             }
 
             out_tasks.addAll(g.grouper(tuple));
@@ -133,8 +124,7 @@ public class TaskSendTargets {
 
         if (isDebuging) {
 
-            LOG.info(debugIdStr + stream + " to " + out_tasks + ":"
-                    + tuple.toString());
+            LOG.info(debugIdStr + stream + " to " + out_tasks + ":" + tuple.toString());
         }
 
         int num_out_tasks = out_tasks.size();
@@ -144,8 +134,7 @@ public class TaskSendTargets {
         return out_tasks;
     }
 
-    public void updateStreamCompGrouper(
-            Map<String, Map<String, MkGrouper>> streamComponentgrouper) {
+    public void updateStreamCompGrouper(Map<String, Map<String, MkGrouper>> streamComponentgrouper) {
         this.streamComponentgrouper = streamComponentgrouper;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TupleInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TupleInfo.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TupleInfo.java
index a6a3406..dc2a2bf 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TupleInfo.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/TupleInfo.java
@@ -66,8 +66,7 @@ public class TupleInfo implements Serializable {
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/UnanchoredSend.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/UnanchoredSend.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/UnanchoredSend.java
index 72c4061..6b0ed2d 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/UnanchoredSend.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/comm/UnanchoredSend.java
@@ -32,11 +32,9 @@ import com.alibaba.jstorm.task.TaskTransfer;
  */
 
 public class UnanchoredSend {
-    public static void send(TopologyContext topologyContext,
-            TaskSendTargets taskTargets, TaskTransfer transfer_fn,
-            String stream, List<Object> values) {
+    public static void send(TopologyContext topologyContext, TaskSendTargets taskTargets, TaskTransfer transfer_fn, String stream, List<Object> values) {
 
-        java.util.List<Integer> tasks = taskTargets.get(stream, values);
+        List<Integer> tasks = taskTargets.get(stream, values);
         if (tasks.size() == 0) {
             return;
         }
@@ -44,8 +42,7 @@ public class UnanchoredSend {
         Integer taskId = topologyContext.getThisTaskId();
 
         for (Integer task : tasks) {
-            TupleImplExt tup =
-                    new TupleImplExt(topologyContext, values, taskId, stream);
+            TupleImplExt tup = new TupleImplExt(topologyContext, values, taskId, stream);
             tup.setTargetTaskId(task);
 
             transfer_fn.transfer(tup);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/ITaskReportErr.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/ITaskReportErr.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/ITaskReportErr.java
index d0d70be..2475ede 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/ITaskReportErr.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/ITaskReportErr.java
@@ -25,4 +25,6 @@ package com.alibaba.jstorm.task.error;
  */
 public interface ITaskReportErr {
     public void report(Throwable error);
+
+    public void report(String error);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskErrorRunable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskErrorRunable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskErrorRunable.java
index d460e5a..1b41037 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskErrorRunable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskErrorRunable.java
@@ -20,8 +20,7 @@ package com.alibaba.jstorm.task.error;
 import com.alibaba.jstorm.callback.RunnableCallback;
 
 /**
- * The callback will be called, when task occur error It just call
- * TaskReportErrorAndDie
+ * The callback will be called, when task occur error It just call TaskReportErrorAndDie
  * 
  * @author yannian
  * 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportError.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportError.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportError.java
index e7506b2..bf177f6 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportError.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportError.java
@@ -34,8 +34,7 @@ public class TaskReportError implements ITaskReportErr {
     private String topology_id;
     private int task_id;
 
-    public TaskReportError(StormClusterState _storm_cluster_state,
-            String _topology_id, int _task_id) {
+    public TaskReportError(StormClusterState _storm_cluster_state, String _topology_id, int _task_id) {
         this.zkCluster = _storm_cluster_state;
         this.topology_id = _topology_id;
         this.task_id = _task_id;
@@ -44,16 +43,23 @@ public class TaskReportError implements ITaskReportErr {
     @Override
     public void report(Throwable error) {
 
-        LOG.error("Report error to /ZK/taskerrors/" + topology_id + "/"
-                + task_id + "\n", error);
+        LOG.error("Report error to /ZK/taskerrors/" + topology_id + "/" + task_id + "\n", error);
         try {
             zkCluster.report_task_error(topology_id, task_id, error);
         } catch (Exception e) {
             // TODO Auto-generated catch block
-            LOG.error("Failed update error to /ZK/taskerrors/" + topology_id
-                    + "/" + task_id + "\n", e);
+            LOG.error("Failed update error to /ZK/taskerrors/" + topology_id + "/" + task_id + "\n", e);
         }
-
     }
 
+    @Override
+    public void report(String error) {
+
+        LOG.error("Report error to /ZK/taskerrors/" + topology_id + "/" + task_id + ": " + error);
+        try {
+            zkCluster.report_task_error(topology_id, task_id, error, null);
+        } catch (Exception e) {
+            LOG.error("Failed update error to /ZK/taskerrors/" + topology_id + "/" + task_id + "\n", e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportErrorAndDie.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportErrorAndDie.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportErrorAndDie.java
index 4f4eab3..e8596de 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportErrorAndDie.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/error/TaskReportErrorAndDie.java
@@ -29,15 +29,20 @@ public class TaskReportErrorAndDie implements ITaskReportErr {
     private ITaskReportErr reporterror;
     private RunnableCallback haltfn;
 
-    public TaskReportErrorAndDie(ITaskReportErr _reporterror,
-            RunnableCallback _haltfn) {
+    public TaskReportErrorAndDie(ITaskReportErr _reporterror, RunnableCallback _haltfn) {
         this.reporterror = _reporterror;
         this.haltfn = _haltfn;
     }
 
+    // If throwable error was caught, a error will be reported and current task will be shutdown.
     @Override
     public void report(Throwable error) {
         this.reporterror.report(error);
         this.haltfn.run();
     }
+
+    @Override
+    public void report(String error) {
+        this.reporterror.report(error);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BaseExecutors.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BaseExecutors.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BaseExecutors.java
index 3f4c18f..7e4495a 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BaseExecutors.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BaseExecutors.java
@@ -17,43 +17,40 @@
  */
 package com.alibaba.jstorm.task.execute;
 
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.Config;
-import backtype.storm.serialization.KryoTupleDeserializer;
+import backtype.storm.Constants;
+import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
 import backtype.storm.utils.DisruptorQueue;
 import backtype.storm.utils.Utils;
 import backtype.storm.utils.WorkerClassLoader;
-
-import com.alibaba.jstorm.callback.AsyncLoopRunnable;
-import com.alibaba.jstorm.callback.AsyncLoopThread;
 import com.alibaba.jstorm.callback.RunnableCallback;
 import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.common.metric.Histogram;
+import com.alibaba.jstorm.common.metric.AsmGauge;
 import com.alibaba.jstorm.common.metric.QueueGauge;
 import com.alibaba.jstorm.daemon.worker.timer.RotatingMapTrigger;
+import com.alibaba.jstorm.daemon.worker.timer.TaskBatchFlushTrigger;
 import com.alibaba.jstorm.daemon.worker.timer.TaskHeartbeatTrigger;
-import com.alibaba.jstorm.metric.JStormHealthCheck;
-import com.alibaba.jstorm.metric.JStormMetrics;
-import com.alibaba.jstorm.metric.MetricDef;
+import com.alibaba.jstorm.metric.*;
 import com.alibaba.jstorm.task.Task;
 import com.alibaba.jstorm.task.TaskBaseMetric;
+import com.alibaba.jstorm.task.TaskBatchTransfer;
 import com.alibaba.jstorm.task.TaskStatus;
 import com.alibaba.jstorm.task.TaskTransfer;
 import com.alibaba.jstorm.task.error.ITaskReportErr;
 import com.alibaba.jstorm.utils.JStormServerUtils;
 import com.alibaba.jstorm.utils.JStormUtils;
-import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.dsl.ProducerType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
 
 //import com.alibaba.jstorm.message.zeroMq.IRecvConnection;
 
@@ -67,15 +64,17 @@ import com.lmax.disruptor.dsl.ProducerType;
 public class BaseExecutors extends RunnableCallback {
     private static Logger LOG = LoggerFactory.getLogger(BaseExecutors.class);
 
-    protected final String component_id;
+    protected final String topologyId;
+    protected final String componentId;
     protected final int taskId;
     protected final String idStr;
 
     protected Map storm_conf;
-    
+
     protected final boolean isDebug;
 
     protected TopologyContext userTopologyCtx;
+    protected TopologyContext sysTopologyCtx;
     protected TaskBaseMetric task_stats;
 
     protected volatile TaskStatus taskStatus;
@@ -93,74 +92,91 @@ public class BaseExecutors extends RunnableCallback {
     protected Task task;
     protected long assignmentTs;
     protected TaskTransfer taskTransfer;
+   
+    protected JStormMetricsReporter metricsReporter;
+    
+    protected boolean isFinishInit = false;
+
+    protected RotatingMapTrigger rotatingMapTrigger;
+    protected TaskHeartbeatTrigger taskHbTrigger;
 
     // protected IntervalCheck intervalCheck = new IntervalCheck();
 
-    public BaseExecutors(Task task, TaskTransfer _transfer_fn, Map _storm_conf,
-            Map<Integer, DisruptorQueue> innerTaskTransfer,
-            TopologyContext topology_context, TopologyContext _user_context,
-            TaskBaseMetric _task_stats, TaskStatus taskStatus,
-            ITaskReportErr _report_error) {
+    public BaseExecutors(Task task) {
 
         this.task = task;
-        this.storm_conf = _storm_conf;
-
-        this.userTopologyCtx = _user_context;
-        this.task_stats = _task_stats;
-        this.taskId = topology_context.getThisTaskId();
-        this.innerTaskTransfer = innerTaskTransfer;
-        this.component_id = topology_context.getThisComponentId();
-        this.idStr = JStormServerUtils.getName(component_id, taskId);
-
-        this.taskStatus = taskStatus;
-        this.report_error = _report_error;
-
-        this.isDebug =
-                JStormUtils.parseBoolean(storm_conf.get(Config.TOPOLOGY_DEBUG),
-                        false);
-
-        message_timeout_secs =
-                JStormUtils.parseInt(
-                        storm_conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS),
-                        30);
-
-        int queue_size =
-                Utils.getInt(storm_conf
-                        .get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), 256);
-        WaitStrategy waitStrategy =
-                (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(storm_conf);
-        this.exeQueue =
-                DisruptorQueue.mkInstance(idStr, ProducerType.MULTI,
-                        queue_size, waitStrategy);
+        this.storm_conf = task.getStormConf();
+
+        this.userTopologyCtx = task.getUserContext();
+        this.sysTopologyCtx = task.getTopologyContext();
+        this.task_stats = task.getTaskStats();
+        this.taskId = sysTopologyCtx.getThisTaskId();
+        this.innerTaskTransfer = task.getInnerTaskTransfer();
+        this.topologyId = sysTopologyCtx.getTopologyId();
+        this.componentId = sysTopologyCtx.getThisComponentId();
+        this.idStr = JStormServerUtils.getName(componentId, taskId);
+
+        this.taskStatus = task.getTaskStatus();
+        this.report_error = task.getReportErrorDie();
+        this.taskTransfer = task.getTaskTransfer();
+        this.metricsReporter = task.getWorkerData().getMetricsReporter();
+
+        this.isDebug = JStormUtils.parseBoolean(storm_conf.get(Config.TOPOLOGY_DEBUG), false);
+
+        message_timeout_secs = JStormUtils.parseInt(storm_conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 30);
+
+        int queue_size = Utils.getInt(storm_conf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), 256);
+        WaitStrategy waitStrategy = (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(storm_conf);
+        this.exeQueue = DisruptorQueue.mkInstance(idStr, ProducerType.MULTI, queue_size, waitStrategy);
         this.exeQueue.consumerStarted();
-        this.controlQueue = new LinkedBlockingDeque<Object>(8);
+        this.controlQueue = new LinkedBlockingDeque<Object>();
 
         this.registerInnerTransfer(exeQueue);
 
-        QueueGauge exeQueueGauge =
-                new QueueGauge(idStr + MetricDef.EXECUTE_QUEUE, exeQueue);
-        JStormMetrics.registerTaskGauge(exeQueueGauge, taskId,
-                MetricDef.EXECUTE_QUEUE);
-        JStormHealthCheck.registerTaskHealthCheck(taskId,
-                MetricDef.EXECUTE_QUEUE, exeQueueGauge);
+        QueueGauge exeQueueGauge = new QueueGauge(exeQueue, idStr, MetricDef.EXECUTE_QUEUE);
+        JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(topologyId, componentId, taskId, MetricDef.EXECUTE_QUEUE, MetricType.GAUGE), new AsmGauge(
+                exeQueueGauge));
+        JStormHealthCheck.registerTaskHealthCheck(taskId, MetricDef.EXECUTE_QUEUE, exeQueueGauge);
 
-        RotatingMapTrigger rotatingMapTrigger =
-                new RotatingMapTrigger(storm_conf, idStr + "_rotating",
-                        exeQueue);
+        rotatingMapTrigger = new RotatingMapTrigger(storm_conf, idStr + "_rotating", exeQueue);
         rotatingMapTrigger.register();
-        TaskHeartbeatTrigger taskHbTrigger =
-                new TaskHeartbeatTrigger(storm_conf, idStr + "_taskHeartbeat",
-                        exeQueue, controlQueue, taskId);
+        taskHbTrigger = new TaskHeartbeatTrigger(storm_conf, idStr + "_taskHeartbeat", exeQueue, controlQueue, taskId, componentId, sysTopologyCtx, report_error);
         taskHbTrigger.register();
-
+        
         assignmentTs = System.currentTimeMillis();
         
-        this.taskTransfer = _transfer_fn;
+    }
+    
+    public void init() throws Exception {
+    	// this function will be override by SpoutExecutor or BoltExecutor
+        throw new RuntimeException("Should implement this function");
+    }
+    
+    public void initWrapper() {
+    	try {
+            LOG.info("{} begin to init", idStr);
+            
+            init();
+            
+            if (taskId == getMinTaskIdOfWorker()) {
+                metricsReporter.setOutputCollector(getOutputCollector());
+            }
+            
+            isFinishInit = true;
+        } catch (Throwable e) {
+            error = e;
+            LOG.error("Init error ", e);
+            report_error.report(e);
+        } finally {
+
+            LOG.info("{} initialization finished", idStr);
+            
+        }
     }
 
     @Override
     public void preRun() {
-        WorkerClassLoader.switchThreadContext();  
+        WorkerClassLoader.switchThreadContext();
     }
 
     @Override
@@ -174,28 +190,11 @@ public class BaseExecutors extends RunnableCallback {
         throw new RuntimeException("Should implement this function");
     }
 
-    // @Override
-    // public Object getResult() {
-    // if (taskStatus.isRun()) {
-    // return 0;
-    // } else if (taskStatus.isPause()) {
-    // return 0;
-    // } else if (taskStatus.isShutdown()) {
-    // this.shutdown();
-    // return -1;
-    // } else {
-    // LOG.info("Unknow TaskStatus, shutdown executing thread of " + idStr);
-    // this.shutdown();
-    // return -1;
-    // }
-    // }
-
     @Override
     public Exception error() {
         if (error == null) {
             return null;
         }
-
         return new Exception(error);
     }
 
@@ -213,12 +212,9 @@ public class BaseExecutors extends RunnableCallback {
         LOG.info("Registor inner transfer for executor thread of " + idStr);
         DisruptorQueue existInnerTransfer = innerTaskTransfer.get(taskId);
         if (existInnerTransfer != null) {
-            LOG.info("Exist inner task transfer for executing thread of "
-                    + idStr);
+            LOG.info("Exist inner task transfer for executing thread of " + idStr);
             if (existInnerTransfer != disruptorQueue) {
-                throw new RuntimeException(
-                        "Inner task transfer must be only one in executing thread of "
-                                + idStr);
+                throw new RuntimeException("Inner task transfer must be only one in executing thread of " + idStr);
             }
         }
         innerTaskTransfer.put(taskId, disruptorQueue);
@@ -229,4 +225,12 @@ public class BaseExecutors extends RunnableCallback {
         innerTaskTransfer.remove(taskId);
     }
 
+    protected int getMinTaskIdOfWorker() {
+        SortedSet<Integer> tasks = new TreeSet<Integer>(sysTopologyCtx.getThisWorkerTasks());
+        return tasks.first();
+    }
+    
+    public Object getOutputCollector() {
+    	return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltCollector.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltCollector.java
index a51d09a..c4ee4ad 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltCollector.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltCollector.java
@@ -28,16 +28,13 @@ import java.util.Random;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.Config;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.MessageId;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImplExt;
-
-import com.alibaba.jstorm.common.metric.Histogram;
+import com.alibaba.jstorm.common.metric.AsmHistogram;
+import com.alibaba.jstorm.common.metric.AsmMetric;
 import com.alibaba.jstorm.metric.JStormMetrics;
 import com.alibaba.jstorm.metric.MetricDef;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.metric.MetricUtils;
+import com.alibaba.jstorm.task.Task;
 import com.alibaba.jstorm.task.TaskBaseMetric;
 import com.alibaba.jstorm.task.TaskTransfer;
 import com.alibaba.jstorm.task.acker.Acker;
@@ -48,11 +45,18 @@ import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.RotatingMap;
 import com.alibaba.jstorm.utils.TimeUtils;
 
+import backtype.storm.Config;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.MessageId;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleExt;
+import backtype.storm.tuple.TupleImplExt;
+
 /**
  * bolt output interface, do emit/ack/fail
  * 
  * @author yannian/Longda
- * 
  */
 public class BoltCollector implements IOutputCollector {
     private static Logger LOG = LoggerFactory.getLogger(BoltCollector.class);
@@ -72,62 +76,56 @@ public class BoltCollector implements IOutputCollector {
 
     private Map storm_conf;
     private Integer ackerNum;
-    private Histogram timer;
+    private AsmMetric timer;
     private Random random;
-
-    public BoltCollector(int message_timeout_secs, ITaskReportErr report_error,
-            TaskSendTargets _send_fn, Map _storm_conf,
-            TaskTransfer _transfer_fn, TopologyContext _topology_context,
-            Integer task_id, RotatingMap<Tuple, Long> tuple_start_times,
-            TaskBaseMetric _task_stats) {
-
-        this.rotateTime =
-                1000L * message_timeout_secs / (Acker.TIMEOUT_BUCKET_NUM - 1);
-        this.reportError = report_error;
-        this.sendTargets = _send_fn;
-        this.storm_conf = _storm_conf;
-        this.taskTransfer = _transfer_fn;
-        this.topologyContext = _topology_context;
-        this.task_id = task_id;
-        this.task_stats = _task_stats;
-
-        this.pending_acks =
-                new RotatingMap<Tuple, Long>(Acker.TIMEOUT_BUCKET_NUM);
+    
+
+    //ITaskReportErr report_error, TaskSendTargets _send_fn, Map _storm_conf, TaskTransfer _transfer_fn,
+    //TopologyContext _topology_context, Integer task_id,  TaskBaseMetric _task_stats
+    public BoltCollector(Task task, RotatingMap<Tuple, Long> tuple_start_times, int message_timeout_secs) {
+
+        this.rotateTime = 1000L * message_timeout_secs / (Acker.TIMEOUT_BUCKET_NUM - 1);
+        this.reportError = task.getReportErrorDie();
+        this.sendTargets = task.getTaskSendTargets();
+        this.storm_conf = task.getStormConf();
+        this.taskTransfer = task.getTaskTransfer();
+        this.topologyContext = task.getTopologyContext();
+        this.task_id = task.getTaskId();
+        this.task_stats = task.getTaskStats();
+
+        this.pending_acks = new RotatingMap<Tuple, Long>(Acker.TIMEOUT_BUCKET_NUM);
         // this.pending_acks = new TimeCacheMap<Tuple,
         // Long>(message_timeout_secs,
         // Acker.TIMEOUT_BUCKET_NUM);
         this.tuple_start_times = tuple_start_times;
 
-        this.ackerNum =
-                JStormUtils.parseInt(storm_conf
-                        .get(Config.TOPOLOGY_ACKER_EXECUTORS));
+        this.ackerNum = JStormUtils.parseInt(storm_conf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
 
         String componentId = topologyContext.getThisComponentId();
-        timer =
-                JStormMetrics.registerTaskHistogram(task_id,
-                        MetricDef.COLLECTOR_EMIT_TIME);
+        this.timer =
+                JStormMetrics.registerTaskMetric(
+                        MetricUtils.taskMetricName(topologyContext.getTopologyId(), componentId, task_id, MetricDef.COLLECTOR_EMIT_TIME, MetricType.HISTOGRAM),
+                        new AsmHistogram());
 
         random = new Random();
         random.setSeed(System.currentTimeMillis());
+
     }
 
     @Override
-    public List<Integer> emit(String streamId, Collection<Tuple> anchors,
-            List<Object> tuple) {
+    public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
         return boltEmit(streamId, anchors, tuple, null);
     }
 
     @Override
-    public void emitDirect(int taskId, String streamId,
-            Collection<Tuple> anchors, List<Object> tuple) {
+    public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
         boltEmit(streamId, anchors, tuple, taskId);
     }
 
-    private List<Integer> boltEmit(String out_stream_id,
-            Collection<Tuple> anchors, List<Object> values, Integer out_task_id) {
-        long start = System.nanoTime();
+    private List<Integer> boltEmit(String out_stream_id, Collection<Tuple> anchors, List<Object> values, Integer out_task_id) {
+        final long start = System.nanoTime();
         try {
-            java.util.List<Integer> out_tasks = null;
+            List<Integer> out_tasks;
             if (out_task_id != null) {
                 out_tasks = sendTargets.get(out_task_id, out_stream_id, values);
             } else {
@@ -146,59 +144,49 @@ public class BoltCollector implements IOutputCollector {
                             lastRotate = now;
                         }
                         put_xor(pending_acks, a, edge_id);
-                        for (Long root_id : a.getMessageId().getAnchorsToIds()
-                                .keySet()) {
+                        for (Long root_id : a.getMessageId().getAnchorsToIds().keySet()) {
                             put_xor(anchors_to_ids, root_id, edge_id);
                         }
                     }
                 }
                 MessageId msgid = MessageId.makeId(anchors_to_ids);
-                TupleImplExt tupleExt =
-                        new TupleImplExt(topologyContext, values, task_id,
-                                out_stream_id, msgid);
+                TupleImplExt tupleExt = new TupleImplExt(topologyContext, values, task_id, out_stream_id, msgid);
                 tupleExt.setTargetTaskId(t);
 
                 taskTransfer.transfer(tupleExt);
-
             }
             return out_tasks;
         } catch (Exception e) {
             LOG.error("bolt emit", e);
         } finally {
             long end = System.nanoTime();
-            timer.update((end - start)/1000000.0d);
+            timer.update((end - start) / TimeUtils.NS_PER_US);
         }
         return new ArrayList<Integer>();
     }
 
     @Override
     public void ack(Tuple input) {
-
         if (ackerNum > 0) {
-
-            Long ack_val = Long.valueOf(0);
+            Long ack_val = 0L;
             Object pend_val = pending_acks.remove(input);
             if (pend_val != null) {
                 ack_val = (Long) (pend_val);
             }
 
-            for (Entry<Long, Long> e : input.getMessageId().getAnchorsToIds()
-                    .entrySet()) {
-
-                UnanchoredSend.send(
-                        topologyContext,
-                        sendTargets,
-                        taskTransfer,
-                        Acker.ACKER_ACK_STREAM_ID,
-                        JStormUtils.mk_list((Object) e.getKey(),
-                                JStormUtils.bit_xor(e.getValue(), ack_val)));
+            for (Entry<Long, Long> e : input.getMessageId().getAnchorsToIds().entrySet()) {
+                UnanchoredSend.send(topologyContext, sendTargets, taskTransfer, Acker.ACKER_ACK_STREAM_ID,
+                        JStormUtils.mk_list((Object) e.getKey(), JStormUtils.bit_xor(e.getValue(), ack_val)));
             }
         }
 
-        Long delta = tuple_time_delta(tuple_start_times, input);
-        if (delta != null) {
-            task_stats.bolt_acked_tuple(input.getSourceComponent(),
-                    input.getSourceStreamId(), Double.valueOf(delta));
+        Long startTime = (Long) tuple_start_times.remove(input);
+        if (startTime != null) {
+        	Long endTime = System.nanoTime();
+        	long latency = (endTime - startTime)/TimeUtils.NS_PER_US;
+        	long lifeCycle = (System.currentTimeMillis() - ((TupleExt) input).getCreationTimeStamp()) * TimeUtils.NS_PER_US;
+        	
+            task_stats.bolt_acked_tuple(input.getSourceComponent(), input.getSourceStreamId(), latency, lifeCycle);
         }
     }
 
@@ -207,17 +195,12 @@ public class BoltCollector implements IOutputCollector {
         // if ackerNum == 0, we can just return
         if (ackerNum > 0) {
             pending_acks.remove(input);
-            for (Entry<Long, Long> e : input.getMessageId().getAnchorsToIds()
-                    .entrySet()) {
-                UnanchoredSend.send(topologyContext, sendTargets, taskTransfer,
-                        Acker.ACKER_FAIL_STREAM_ID,
-                        JStormUtils.mk_list((Object) e.getKey()));
+            for (Entry<Long, Long> e : input.getMessageId().getAnchorsToIds().entrySet()) {
+                UnanchoredSend.send(topologyContext, sendTargets, taskTransfer, Acker.ACKER_FAIL_STREAM_ID, JStormUtils.mk_list((Object) e.getKey()));
             }
         }
 
-        task_stats.bolt_failed_tuple(input.getSourceComponent(),
-                input.getSourceStreamId());
-
+        task_stats.bolt_failed_tuple(input.getSourceComponent(), input.getSourceStreamId());
     }
 
     @Override
@@ -226,21 +209,19 @@ public class BoltCollector implements IOutputCollector {
     }
 
     // Utility functions, just used here
-    public static Long tuple_time_delta(RotatingMap<Tuple, Long> start_times,
-            Tuple tuple) {
+    public static Long tuple_time_delta(RotatingMap<Tuple, Long> start_times, Tuple tuple) {
         Long start_time = (Long) start_times.remove(tuple);
         if (start_time != null) {
-            return TimeUtils.time_delta_ms(start_time);
+            return (System.nanoTime() - start_time)/TimeUtils.NS_PER_US;
         }
         return null;
     }
 
-    public static void put_xor(RotatingMap<Tuple, Long> pending, Tuple key,
-            Long id) {
+    public static void put_xor(RotatingMap<Tuple, Long> pending, Tuple key, Long id) {
         // synchronized (pending) {
         Long curr = pending.get(key);
         if (curr == null) {
-            curr = Long.valueOf(0);
+            curr = 0L;
         }
         pending.put(key, JStormUtils.bit_xor(curr, id));
         // }
@@ -250,7 +231,7 @@ public class BoltCollector implements IOutputCollector {
         // synchronized (pending) {
         Long curr = pending.get(key);
         if (curr == null) {
-            curr = Long.valueOf(0);
+            curr = 0L;
         }
         pending.put(key, JStormUtils.bit_xor(curr, id));
         // }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltExecutors.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltExecutors.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltExecutors.java
index 15adbf2..5c4413e 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltExecutors.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/BoltExecutors.java
@@ -17,13 +17,6 @@
  */
 package com.alibaba.jstorm.task.execute;
 
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.Config;
 import backtype.storm.Constants;
 import backtype.storm.task.IBolt;
@@ -32,37 +25,48 @@ import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.BatchTuple;
 import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleExt;
+import backtype.storm.tuple.TupleImplExt;
+import backtype.storm.tuple.Values;
 import backtype.storm.utils.DisruptorQueue;
 import backtype.storm.utils.WorkerClassLoader;
 
 import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.common.metric.Histogram;
+import com.alibaba.jstorm.cluster.Common;
+import com.alibaba.jstorm.common.metric.AsmHistogram;
+import com.alibaba.jstorm.common.metric.AsmMetric;
+import com.alibaba.jstorm.daemon.worker.timer.BackpressureCheckTrigger;
 import com.alibaba.jstorm.daemon.worker.timer.TaskBatchFlushTrigger;
 import com.alibaba.jstorm.daemon.worker.timer.TickTupleTrigger;
 import com.alibaba.jstorm.daemon.worker.timer.TimerConstants;
 import com.alibaba.jstorm.daemon.worker.timer.TimerTrigger;
 import com.alibaba.jstorm.metric.JStormMetrics;
+import com.alibaba.jstorm.metric.JStormMetricsReporter;
 import com.alibaba.jstorm.metric.MetricDef;
-import com.alibaba.jstorm.task.Task;
-import com.alibaba.jstorm.task.TaskBaseMetric;
-import com.alibaba.jstorm.task.TaskStatus;
-import com.alibaba.jstorm.task.TaskTransfer;
-import com.alibaba.jstorm.task.TaskBatchTransfer;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.metric.MetricUtils;
+import com.alibaba.jstorm.task.*;
 import com.alibaba.jstorm.task.acker.Acker;
+import com.alibaba.jstorm.task.backpressure.BackpressureTrigger;
 import com.alibaba.jstorm.task.comm.TaskSendTargets;
 import com.alibaba.jstorm.task.error.ITaskReportErr;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatRunable;
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.RotatingMap;
 import com.alibaba.jstorm.utils.TimeUtils;
 import com.lmax.disruptor.EventHandler;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
 /**
- * 
  * BoltExecutor
  * 
  * @author yannian/Longda
- * 
  */
 public class BoltExecutors extends BaseExecutors implements EventHandler {
     private static Logger LOG = LoggerFactory.getLogger(BoltExecutors.class);
@@ -76,89 +80,56 @@ public class BoltExecutors extends BaseExecutors implements EventHandler {
     // internal outputCollector is BoltCollector
     private OutputCollector outputCollector;
 
-    private Histogram boltExeTimer;
-
-    public BoltExecutors(Task task, IBolt _bolt, TaskTransfer _transfer_fn,
-            Map<Integer, DisruptorQueue> innerTaskTransfer, Map storm_conf,
-            TaskSendTargets _send_fn, TaskStatus taskStatus,
-            TopologyContext sysTopologyCxt, TopologyContext userTopologyCxt,
-            TaskBaseMetric _task_stats, ITaskReportErr _report_error) {
+    private AsmMetric boltExeTimer;
+    private volatile double exeTime;
 
-        super(task, _transfer_fn, storm_conf, innerTaskTransfer,
-                sysTopologyCxt, userTopologyCxt, _task_stats, taskStatus,
-                _report_error);
+    private BackpressureTrigger backpressureTrigger;
+    private boolean isSystemBolt;
 
-        this.bolt = _bolt;
+    //, IBolt _bolt, TaskTransfer _transfer_fn, Map<Integer, DisruptorQueue> innerTaskTransfer, Map storm_conf,
+    //TaskSendTargets _send_fn, TaskStatus taskStatus, TopologyContext sysTopologyCxt, TopologyContext userTopologyCxt, TaskBaseMetric _task_stats,
+    //ITaskReportErr _report_error, JStormMetricsReporter metricReport
+    public BoltExecutors(Task task) {
 
-        // create TimeCacheMap
+        super(task);
 
-        this.tuple_start_times =
-                new RotatingMap<Tuple, Long>(Acker.TIMEOUT_BUCKET_NUM);
+        this.bolt = (IBolt)task.getTaskObj();
 
-        this.ackerNum =
-                JStormUtils.parseInt(storm_conf
-                        .get(Config.TOPOLOGY_ACKER_EXECUTORS));
-
-        // don't use TimeoutQueue for recv_tuple_queue,
-        // then other place should check the queue size
-        // TimeCacheQueue.DefaultExpiredCallback<Tuple> logExpireCb = new
-        // TimeCacheQueue.DefaultExpiredCallback<Tuple>(
-        // idStr);
-        // this.recv_tuple_queue = new
-        // TimeCacheQueue<Tuple>(message_timeout_secs,
-        // TimeCacheQueue.DEFAULT_NUM_BUCKETS, logExpireCb);
+        // create TimeCacheMap
+        this.tuple_start_times = new RotatingMap<Tuple, Long>(Acker.TIMEOUT_BUCKET_NUM);
+        this.ackerNum = JStormUtils.parseInt(storm_conf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
 
         // create BoltCollector
-        IOutputCollector output_collector =
-                new BoltCollector(message_timeout_secs, _report_error,
-                        _send_fn, storm_conf, _transfer_fn, sysTopologyCxt,
-                        taskId, tuple_start_times, _task_stats);
-
+        IOutputCollector output_collector = new BoltCollector(task, tuple_start_times, message_timeout_secs);
         outputCollector = new OutputCollector(output_collector);
+        taskHbTrigger.setBoltOutputCollector(outputCollector);
 
-        boltExeTimer =
-                JStormMetrics.registerTaskHistogram(taskId,
-                        MetricDef.EXECUTE_TIME);
+        String metricName = MetricUtils.taskMetricName(topologyId, componentId, taskId, MetricDef.EXECUTE_TIME, MetricType.HISTOGRAM);
+        this.boltExeTimer = JStormMetrics.registerTaskMetric(metricName, new AsmHistogram());
 
-        Object tickFrequence =
-                storm_conf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
+        Object tickFrequence = storm_conf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
         if (tickFrequence != null) {
             Integer frequence = JStormUtils.parseInt(tickFrequence);
-            TickTupleTrigger tickTupleTrigger =
-                    new TickTupleTrigger(sysTopologyCxt, frequence, idStr
-                            + Constants.SYSTEM_TICK_STREAM_ID, exeQueue);
+            TickTupleTrigger tickTupleTrigger = new TickTupleTrigger(sysTopologyCtx, frequence, idStr + Constants.SYSTEM_TICK_STREAM_ID, exeQueue);
             tickTupleTrigger.register();
         }
-
-        if (ConfigExtension.isTaskBatchTuple(storm_conf)) {
-            TaskBatchFlushTrigger batchFlushTrigger =
-                    new TaskBatchFlushTrigger(5, idStr
-                            + Constants.SYSTEM_COMPONENT_ID,
-                            (TaskBatchTransfer) _transfer_fn);
-            batchFlushTrigger.register(TimeUnit.MILLISECONDS);
-        }
-
-        try {
-            // do prepare
-            WorkerClassLoader.switchThreadContext();
-
-            // Method method = IBolt.class.getMethod("prepare", new Class[]
-            // {Map.class, TopologyContext.class,
-            // OutputCollector.class});
-            // method.invoke(bolt, new Object[] {storm_conf, userTopologyCxt,
-            // outputCollector});
-            bolt.prepare(storm_conf, userTopologyCtx, outputCollector);
-
-        } catch (Throwable e) {
-            error = e;
-            LOG.error("bolt prepare error ", e);
-            report_error.report(e);
-        } finally {
-            WorkerClassLoader.restoreThreadContext();
+       
+        
+        isSystemBolt = Common.isSystemComponent(componentId);
+        if (isSystemBolt == false) {
+            backpressureTrigger = new BackpressureTrigger(task, this, storm_conf, outputCollector);
+            int backpressureCheckFrequence = ConfigExtension.getBackpressureCheckIntervl(storm_conf);
+            BackpressureCheckTrigger backpressureCheckTrigger =
+                    new BackpressureCheckTrigger(30, backpressureCheckFrequence, idStr + " backpressure check trigger", backpressureTrigger);
+            backpressureCheckTrigger.register(TimeUnit.MILLISECONDS);
         }
 
         LOG.info("Successfully create BoltExecutors " + idStr);
-
+    }
+    
+    @Override
+    public void init() {
+    	bolt.prepare(storm_conf, userTopologyCtx, outputCollector);
     }
 
     @Override
@@ -168,10 +139,14 @@ public class BoltExecutors extends BaseExecutors implements EventHandler {
 
     @Override
     public void run() {
+    	if (isFinishInit == false) {
+    		initWrapper();
+    	}
         while (taskStatus.isShutdown() == false) {
             try {
+                //if (backpressureTrigger != null)
+                //    backpressureTrigger.checkAndTrigger();
                 exeQueue.consumeBatchWhenAvailable(this);
-
                 processControlEvent();
             } catch (Throwable e) {
                 if (taskStatus.isShutdown() == false) {
@@ -182,20 +157,19 @@ public class BoltExecutors extends BaseExecutors implements EventHandler {
     }
 
     @Override
-    public void onEvent(Object event, long sequence, boolean endOfBatch)
-            throws Exception {
-
+    public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
         if (event == null) {
             return;
         }
 
         long start = System.nanoTime();
-
         try {
             if (event instanceof Tuple) {
+                processControlEvent();
                 processTupleEvent((Tuple) event);
             } else if (event instanceof BatchTuple) {
                 for (Tuple tuple : ((BatchTuple) event).getTuples()) {
+                    processControlEvent();
                     processTupleEvent((Tuple) tuple);
                 }
             } else if (event instanceof TimerTrigger.TimerEvent) {
@@ -205,18 +179,21 @@ public class BoltExecutors extends BaseExecutors implements EventHandler {
             }
         } finally {
             long end = System.nanoTime();
-            boltExeTimer.update((end - start) / 1000000.0d);
+            exeTime = (end - start) / TimeUtils.NS_PER_US;
+            boltExeTimer.update(exeTime);
         }
     }
 
     private void processTupleEvent(Tuple tuple) {
-        task_stats.recv_tuple(tuple.getSourceComponent(),
-                tuple.getSourceStreamId());
-
-        tuple_start_times.put(tuple, System.currentTimeMillis());
+        task_stats.recv_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId());
+        tuple_start_times.put(tuple, System.nanoTime());
 
         try {
-            bolt.execute(tuple);
+            if (isSystemBolt == false && tuple.getSourceStreamId().equals(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID)) {
+                backpressureTrigger.handle(tuple);
+            } else {
+                bolt.execute(tuple);
+            }
         } catch (Throwable e) {
             error = e;
             LOG.error("bolt execute error ", e);
@@ -226,11 +203,12 @@ public class BoltExecutors extends BaseExecutors implements EventHandler {
         if (ackerNum == 0) {
             // only when acker is disable
             // get tuple process latency
-            Long start_time = (Long) tuple_start_times.remove(tuple);
-            if (start_time != null) {
-                Long delta = TimeUtils.time_delta_ms(start_time);
-                task_stats.bolt_acked_tuple(tuple.getSourceComponent(),
-                        tuple.getSourceStreamId(), Double.valueOf(delta));
+            Long startTime = (Long) tuple_start_times.remove(tuple);
+            if (startTime != null) {
+                Long endTime = System.nanoTime();
+                long latency = (endTime - startTime)/TimeUtils.NS_PER_US;
+                long lifeCycle  = (System.currentTimeMillis() - ((TupleExt) tuple).getCreationTimeStamp()) * TimeUtils.NS_PER_US;
+                task_stats.bolt_acked_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), latency, lifeCycle);
             }
         }
     }
@@ -244,8 +222,7 @@ public class BoltExecutors extends BaseExecutors implements EventHandler {
                 // only when acker is enable
                 for (Entry<Tuple, Long> entry : timeoutMap.entrySet()) {
                     Tuple input = entry.getKey();
-                    task_stats.bolt_failed_tuple(input.getSourceComponent(),
-                            input.getSourceStreamId());
+                    task_stats.bolt_failed_tuple(input.getSourceComponent(), input.getSourceStreamId());
                 }
             }
             break;
@@ -262,13 +239,11 @@ public class BoltExecutors extends BaseExecutors implements EventHandler {
             break;
         }
         case TimerConstants.TASK_HEARTBEAT: {
-            Integer taskId = (Integer) event.getMsg();
-            TaskHeartbeatRunable.updateTaskHbStats(taskId, task);
+            taskHbTrigger.setExeThreadHbTime(TimeUtils.current_time_secs());
             break;
         }
         default: {
-            LOG.warn("Receive unsupported timer event, opcode="
-                    + event.getOpCode());
+            LOG.warn("Receive unsupported timer event, opcode=" + event.getOpCode());
             break;
         }
         }
@@ -282,9 +257,17 @@ public class BoltExecutors extends BaseExecutors implements EventHandler {
                 processTimerEvent((TimerTrigger.TimerEvent) event);
                 LOG.debug("Received one event from control queue");
             } else {
-                LOG.warn("Received unknown control event, "
-                        + event.getClass().getName());
+                LOG.warn("Received unknown control event, " + event.getClass().getName());
             }
         }
     }
+    
+    public double getExecuteTime() {
+        return exeTime;
+    }
+    
+    @Override
+    public Object getOutputCollector() {
+    	return outputCollector;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/AckSpoutMsg.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/AckSpoutMsg.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/AckSpoutMsg.java
index d554efc..d3bc53a 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/AckSpoutMsg.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/AckSpoutMsg.java
@@ -23,10 +23,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import backtype.storm.spout.ISpout;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleExt;
 
 import com.alibaba.jstorm.client.spout.IAckValueSpout;
 import com.alibaba.jstorm.task.TaskBaseMetric;
 import com.alibaba.jstorm.task.comm.TupleInfo;
+import com.alibaba.jstorm.utils.TimeUtils;
 
 /**
  * The action after spout receive one ack tuple
@@ -38,15 +41,15 @@ public class AckSpoutMsg implements IAckMsg {
     private static Logger LOG = LoggerFactory.getLogger(AckSpoutMsg.class);
 
     private ISpout spout;
+    private Tuple tuple;
+    private TupleInfo tupleInfo;
     private Object msgId;
     private String stream;
-    private long timeStamp;
     private List<Object> values;
     private TaskBaseMetric task_stats;
     private boolean isDebug = false;
 
-    public AckSpoutMsg(ISpout _spout, TupleInfo tupleInfo,
-            TaskBaseMetric _task_stats, boolean _isDebug) {
+    public AckSpoutMsg(ISpout _spout, Tuple tuple, TupleInfo tupleInfo, TaskBaseMetric _task_stats, boolean _isDebug) {
 
         this.task_stats = _task_stats;
 
@@ -55,10 +58,11 @@ public class AckSpoutMsg implements IAckMsg {
 
         this.msgId = tupleInfo.getMessageId();
         this.stream = tupleInfo.getStream();
-        if (tupleInfo.getTimestamp() != 0) {
-            this.timeStamp = System.currentTimeMillis() - tupleInfo.getTimestamp(); 
-        }
+        
         this.values = tupleInfo.getValues();
+
+        this.tuple = tuple;
+        this.tupleInfo = tupleInfo;
     }
 
     public void run() {
@@ -73,7 +77,15 @@ public class AckSpoutMsg implements IAckMsg {
             spout.ack(msgId);
         }
 
-        task_stats.spout_acked_tuple(stream, timeStamp);
+        long latency = 0, lifeCycle = 0;
+        if (tupleInfo.getTimestamp() != 0) {
+        	long endTime = System.nanoTime();
+        	latency = (endTime - tupleInfo.getTimestamp())/TimeUtils.NS_PER_US;
+        	if (tuple != null && tuple instanceof TupleExt) {
+        		lifeCycle = (System.currentTimeMillis() - ((TupleExt) tuple).getCreationTimeStamp()) * TimeUtils.NS_PER_US;
+        	}
+        }
+        task_stats.spout_acked_tuple(stream, latency, lifeCycle);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/FailSpoutMsg.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/FailSpoutMsg.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/FailSpoutMsg.java
index 7b5d37b..f570a74 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/FailSpoutMsg.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/FailSpoutMsg.java
@@ -40,8 +40,7 @@ public class FailSpoutMsg implements IAckMsg {
     private TaskBaseMetric task_stats;
     private boolean isDebug = false;
 
-    public FailSpoutMsg(Object id, ISpout _spout, TupleInfo _tupleInfo,
-            TaskBaseMetric _task_stats, boolean _isDebug) {
+    public FailSpoutMsg(Object id, ISpout _spout, TupleInfo _tupleInfo, TaskBaseMetric _task_stats, boolean _isDebug) {
         this.id = id;
         this.spout = _spout;
         this.tupleInfo = _tupleInfo;
@@ -63,8 +62,7 @@ public class FailSpoutMsg implements IAckMsg {
         task_stats.spout_failed_tuple(tupleInfo.getStream());
 
         if (isDebug) {
-            LOG.info("Failed message rootId: {}, messageId:{} : {}", id,
-                    msg_id, tupleInfo.getValues().toString());
+            LOG.info("Failed message rootId: {}, messageId:{} : {}", id, msg_id, tupleInfo.getValues().toString());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/MultipleThreadSpoutExecutors.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/MultipleThreadSpoutExecutors.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/MultipleThreadSpoutExecutors.java
index a74079f..8edd3cc 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/MultipleThreadSpoutExecutors.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/MultipleThreadSpoutExecutors.java
@@ -17,19 +17,13 @@
  */
 package com.alibaba.jstorm.task.execute.spout;
 
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.task.TopologyContext;
 import backtype.storm.utils.DisruptorQueue;
 import backtype.storm.utils.WorkerClassLoader;
-
 import com.alibaba.jstorm.callback.AsyncLoopRunnable;
 import com.alibaba.jstorm.callback.AsyncLoopThread;
 import com.alibaba.jstorm.callback.RunnableCallback;
+import com.alibaba.jstorm.metric.JStormMetricsReporter;
 import com.alibaba.jstorm.task.Task;
 import com.alibaba.jstorm.task.TaskBaseMetric;
 import com.alibaba.jstorm.task.TaskStatus;
@@ -39,35 +33,36 @@ import com.alibaba.jstorm.task.comm.TaskSendTargets;
 import com.alibaba.jstorm.task.comm.TupleInfo;
 import com.alibaba.jstorm.task.error.ITaskReportErr;
 import com.alibaba.jstorm.utils.RotatingMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * spout executor
- * 
+ * <p/>
  * All spout actions will be done here
  * 
  * @author yannian/Longda
- * 
  */
 public class MultipleThreadSpoutExecutors extends SpoutExecutors {
-    private static Logger LOG = LoggerFactory
-            .getLogger(MultipleThreadSpoutExecutors.class);
-
-    public MultipleThreadSpoutExecutors(Task task,
-            backtype.storm.spout.ISpout _spout, TaskTransfer _transfer_fn,
-            Map<Integer, DisruptorQueue> innerTaskTransfer, Map _storm_conf,
-            TaskSendTargets sendTargets, TaskStatus taskStatus,
-            TopologyContext topology_context, TopologyContext _user_context,
-            TaskBaseMetric _task_stats, ITaskReportErr _report_error) {
-        super(task, _spout, _transfer_fn, innerTaskTransfer, _storm_conf,
-                sendTargets, taskStatus, topology_context, _user_context,
-                _task_stats, _report_error);
-
-        ackerRunnableThread = new AsyncLoopThread(new AckerRunnable());
-        pending =
-                new RotatingMap<Long, TupleInfo>(Acker.TIMEOUT_BUCKET_NUM,
-                        null, false);
-
-        super.prepare(sendTargets, _transfer_fn, topology_context);
+    private static Logger LOG = LoggerFactory.getLogger(MultipleThreadSpoutExecutors.class);
+
+    public MultipleThreadSpoutExecutors(Task task) {
+        super(task);
+
+        ackerRunnableThread = new AsyncLoopThread(new AckerRunnable(), false, Thread.NORM_PRIORITY, false);
+    }
+    
+    public void mkPending() {
+    	pending = new RotatingMap<Long, TupleInfo>(Acker.TIMEOUT_BUCKET_NUM, null, false);
+    }
+    
+    @Override 
+    public void init() throws Exception {
+    	super.init();
+    	ackerRunnableThread.start();
     }
 
     @Override
@@ -75,11 +70,14 @@ public class MultipleThreadSpoutExecutors extends SpoutExecutors {
         return idStr + "-" + MultipleThreadSpoutExecutors.class.getSimpleName();
     }
 
-    @Override
-    public void run() {
+	@Override
+	public void run() {
+		if (isFinishInit == false) {
+			initWrapper();
+		}
 
-        super.nextTuple();
-    }
+		super.nextTuple();
+	}
 
     class AckerRunnable extends RunnableCallback {
 
@@ -117,7 +115,7 @@ public class MultipleThreadSpoutExecutors extends SpoutExecutors {
                 }
 
             }
-            
+
             LOG.info("Successfully shutdown Spout's acker thread " + idStr);
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SingleThreadSpoutExecutors.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SingleThreadSpoutExecutors.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SingleThreadSpoutExecutors.java
index 9e4dd21..144b041 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SingleThreadSpoutExecutors.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SingleThreadSpoutExecutors.java
@@ -17,15 +17,9 @@
  */
 package com.alibaba.jstorm.task.execute.spout;
 
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.task.TopologyContext;
 import backtype.storm.utils.DisruptorQueue;
-import backtype.storm.utils.WorkerClassLoader;
-
+import com.alibaba.jstorm.metric.JStormMetricsReporter;
 import com.alibaba.jstorm.task.Task;
 import com.alibaba.jstorm.task.TaskBaseMetric;
 import com.alibaba.jstorm.task.TaskStatus;
@@ -35,35 +29,30 @@ import com.alibaba.jstorm.task.comm.TaskSendTargets;
 import com.alibaba.jstorm.task.comm.TupleInfo;
 import com.alibaba.jstorm.task.error.ITaskReportErr;
 import com.alibaba.jstorm.utils.RotatingMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
 
 /**
  * spout executor
- * 
+ * <p/>
  * All spout actions will be done here
  * 
  * @author yannian/Longda
- * 
  */
 public class SingleThreadSpoutExecutors extends SpoutExecutors {
-    private static Logger LOG = LoggerFactory
-            .getLogger(SingleThreadSpoutExecutors.class);
+    private static Logger LOG = LoggerFactory.getLogger(SingleThreadSpoutExecutors.class);
 
-    public SingleThreadSpoutExecutors(Task task,
-            backtype.storm.spout.ISpout _spout, TaskTransfer _transfer_fn,
-            Map<Integer, DisruptorQueue> innerTaskTransfer, Map _storm_conf,
-            TaskSendTargets sendTargets, TaskStatus taskStatus,
-            TopologyContext topology_context, TopologyContext _user_context,
-            TaskBaseMetric _task_stats, ITaskReportErr _report_error) {
-        super(task, _spout, _transfer_fn, innerTaskTransfer, _storm_conf,
-                sendTargets, taskStatus, topology_context, _user_context,
-                _task_stats, _report_error);
+    public SingleThreadSpoutExecutors(Task task) {
+        super(task);
 
-        // sending Tuple's TimeCacheMap
-        pending =
-                new RotatingMap<Long, TupleInfo>(Acker.TIMEOUT_BUCKET_NUM,
-                        null, true);
-
-        super.prepare(sendTargets, _transfer_fn, topology_context);
+    }
+    
+    @Override
+    public void mkPending() {
+    	// sending Tuple's TimeCacheMap
+        pending = new RotatingMap<Long, TupleInfo>(Acker.TIMEOUT_BUCKET_NUM, null, true);
     }
 
     @Override
@@ -73,10 +62,14 @@ public class SingleThreadSpoutExecutors extends SpoutExecutors {
 
     @Override
     public void run() {
+    	if (isFinishInit == false ) {
+    		initWrapper();
+    	}
+    	
         executeEvent();
 
         super.nextTuple();
-            
+
         processControlEvent();
 
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutCollector.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutCollector.java
index d913a9e..baf709f 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutCollector.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutCollector.java
@@ -25,16 +25,12 @@ import java.util.Random;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.Config;
-import backtype.storm.spout.ISpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.MessageId;
-import backtype.storm.tuple.TupleImplExt;
-import backtype.storm.utils.DisruptorQueue;
-
-import com.alibaba.jstorm.common.metric.Histogram;
+import com.alibaba.jstorm.common.metric.AsmHistogram;
 import com.alibaba.jstorm.metric.JStormMetrics;
 import com.alibaba.jstorm.metric.MetricDef;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.metric.MetricUtils;
+import com.alibaba.jstorm.task.Task;
 import com.alibaba.jstorm.task.TaskBaseMetric;
 import com.alibaba.jstorm.task.TaskTransfer;
 import com.alibaba.jstorm.task.acker.Acker;
@@ -44,12 +40,20 @@ import com.alibaba.jstorm.task.comm.UnanchoredSend;
 import com.alibaba.jstorm.task.error.ITaskReportErr;
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.TimeOutMap;
+import com.alibaba.jstorm.utils.TimeUtils;
+
+import backtype.storm.Config;
+import backtype.storm.spout.ISpout;
+import backtype.storm.spout.ISpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.MessageId;
+import backtype.storm.tuple.TupleImplExt;
+import backtype.storm.utils.DisruptorQueue;
 
 /**
  * spout collector, sending tuple through this Object
  * 
  * @author yannian/Longda
- * 
  */
 public class SpoutCollector implements ISpoutOutputCollector {
     private static Logger LOG = LoggerFactory.getLogger(SpoutCollector.class);
@@ -71,64 +75,53 @@ public class SpoutCollector implements ISpoutOutputCollector {
     private Integer ackerNum;
     private boolean isDebug = false;
 
-    private Histogram emitTotalTimer;
+    private AsmHistogram emitTotalTimer;
     Random random;
 
-    public SpoutCollector(Integer task_id, backtype.storm.spout.ISpout spout,
-            TaskBaseMetric task_stats, TaskSendTargets sendTargets,
-            Map _storm_conf, TaskTransfer _transfer_fn,
-            TimeOutMap<Long, TupleInfo> pending,
-            TopologyContext topology_context,
-            DisruptorQueue disruptorAckerQueue, ITaskReportErr _report_error) {
-        this.sendTargets = sendTargets;
-        this.storm_conf = _storm_conf;
-        this.transfer_fn = _transfer_fn;
+    //Integer task_id, backtype.storm.spout.ISpout spout, TaskBaseMetric task_stats, TaskSendTargets sendTargets, Map _storm_conf,
+    //TaskTransfer _transfer_fn, TimeOutMap<Long, TupleInfo> pending, TopologyContext topology_context, DisruptorQueue disruptorAckerQueue,
+    //ITaskReportErr _report_error
+    public SpoutCollector(Task task, TimeOutMap<Long, TupleInfo> pending, DisruptorQueue disruptorAckerQueue) {
+        this.sendTargets = task.getTaskSendTargets();
+        this.storm_conf = task.getStormConf();
+        this.transfer_fn = task.getTaskTransfer();
         this.pending = pending;
-        this.topology_context = topology_context;
+        this.topology_context = task.getTopologyContext();
 
         this.disruptorAckerQueue = disruptorAckerQueue;
 
-        this.task_stats = task_stats;
-        this.spout = spout;
-        this.task_id = task_id;
-        this.report_error = _report_error;
+        this.task_stats = task.getTaskStats();
+        this.spout = (ISpout)task.getTaskObj();
+        this.task_id = task.getTaskId();
+        this.report_error = task.getReportErrorDie();
 
-        ackerNum =
-                JStormUtils.parseInt(storm_conf
-                        .get(Config.TOPOLOGY_ACKER_EXECUTORS));
-        isDebug =
-                JStormUtils.parseBoolean(storm_conf.get(Config.TOPOLOGY_DEBUG),
-                        false);
+        ackerNum = JStormUtils.parseInt(storm_conf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
+        isDebug = JStormUtils.parseBoolean(storm_conf.get(Config.TOPOLOGY_DEBUG), false);
 
         random = new Random();
         random.setSeed(System.currentTimeMillis());
 
         String componentId = topology_context.getThisComponentId();
         emitTotalTimer =
-                JStormMetrics.registerTaskHistogram(task_id,
-                        MetricDef.COLLECTOR_EMIT_TIME);
-
+                (AsmHistogram) JStormMetrics
+                        .registerTaskMetric(MetricUtils.taskMetricName(topology_context.getTopologyId(), componentId, task_id, MetricDef.COLLECTOR_EMIT_TIME,
+                                MetricType.HISTOGRAM), new AsmHistogram());
     }
 
     @Override
-    public List<Integer> emit(String streamId, List<Object> tuple,
-            Object messageId) {
+    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
         return sendSpoutMsg(streamId, tuple, messageId, null);
     }
 
     @Override
-    public void emitDirect(int taskId, String streamId, List<Object> tuple,
-            Object messageId) {
+    public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
         sendSpoutMsg(streamId, tuple, messageId, taskId);
     }
 
-    private List<Integer> sendSpoutMsg(String out_stream_id,
-            List<Object> values, Object message_id, Integer out_task_id) {
-
-        long startTime = System.nanoTime();
-
+    private List<Integer> sendSpoutMsg(String out_stream_id, List<Object> values, Object message_id, Integer out_task_id) {
+        final long startTime = System.nanoTime();
         try {
-            java.util.List<Integer> out_tasks = null;
+            List<Integer> out_tasks;
             if (out_task_id != null) {
                 out_tasks = sendTargets.get(out_task_id, out_stream_id, values);
             } else {
@@ -148,7 +141,7 @@ public class SpoutCollector implements ISpoutOutputCollector {
             // when duplicate root_id, it will miss call ack/fail
             Long root_id = MessageId.generateId(random);
             if (needAck) {
-                while (pending.containsKey(root_id) == true) {
+                while (pending.containsKey(root_id)) {
                     root_id = MessageId.generateId(random);
                 }
             }
@@ -163,30 +156,23 @@ public class SpoutCollector implements ISpoutOutputCollector {
                     msgid = MessageId.makeUnanchored();
                 }
 
-                TupleImplExt tp =
-                        new TupleImplExt(topology_context, values, task_id,
-                                out_stream_id, msgid);
+                TupleImplExt tp = new TupleImplExt(topology_context, values, task_id, out_stream_id, msgid);
                 tp.setTargetTaskId(t);
                 transfer_fn.transfer(tp);
-
             }
 
             if (needAck) {
-
                 TupleInfo info = new TupleInfo();
                 info.setStream(out_stream_id);
                 info.setValues(values);
                 info.setMessageId(message_id);
-                info.setTimestamp(System.currentTimeMillis());
+                info.setTimestamp(System.nanoTime());
 
                 pending.putHead(root_id, info);
 
-                List<Object> ackerTuple =
-                        JStormUtils.mk_list((Object) root_id,
-                                JStormUtils.bit_xor_vals(ackSeq), task_id);
+                List<Object> ackerTuple = JStormUtils.mk_list((Object) root_id, JStormUtils.bit_xor_vals(ackSeq), task_id);
 
-                UnanchoredSend.send(topology_context, sendTargets, transfer_fn,
-                        Acker.ACKER_INIT_STREAM_ID, ackerTuple);
+                UnanchoredSend.send(topology_context, sendTargets, transfer_fn, Acker.ACKER_INIT_STREAM_ID, ackerTuple);
 
             } else if (message_id != null) {
                 TupleInfo info = new TupleInfo();
@@ -195,23 +181,20 @@ public class SpoutCollector implements ISpoutOutputCollector {
                 info.setMessageId(message_id);
                 info.setTimestamp(0);
 
-                AckSpoutMsg ack =
-                        new AckSpoutMsg(spout, info, task_stats, isDebug);
+                AckSpoutMsg ack = new AckSpoutMsg(spout, null, info, task_stats, isDebug);
                 ack.run();
-
             }
 
             return out_tasks;
         } finally {
             long endTime = System.nanoTime();
-            emitTotalTimer.update((endTime - startTime)/1000000.0d);
+            emitTotalTimer.update((endTime - startTime) / TimeUtils.NS_PER_US);
         }
 
     }
 
     @Override
     public void reportError(Throwable error) {
-        // TODO Auto-generated method stub
         report_error.report(error);
     }