You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:00 UTC

[22/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Cluster.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Cluster.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Cluster.java
index f739087..0bb1bb7 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Cluster.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Cluster.java
@@ -32,7 +32,6 @@ import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
 import com.alibaba.jstorm.schedule.Assignment;
 import com.alibaba.jstorm.task.TaskInfo;
 import com.alibaba.jstorm.task.error.TaskError;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeat;
 import com.alibaba.jstorm.utils.TimeUtils;
 
 /**
@@ -61,6 +60,8 @@ public class Cluster {
     public static final String METRIC_ROOT = "metrics";
 
     public static final String LAST_ERROR = "last_error";
+    public static final String NIMBUS_SLAVE_DETAIL_ROOT= "nimbus_slave_detail";
+    public static final String BACKPRESSURE_ROOT = "backpressure";
 
     public static final String ASSIGNMENTS_SUBTREE;
     public static final String ASSIGNMENTS_BAK_SUBTREE;
@@ -72,6 +73,8 @@ public class Cluster {
     public static final String MASTER_SUBTREE;
     public static final String NIMBUS_SLAVE_SUBTREE;
     public static final String METRIC_SUBTREE;
+    public static final String NIMBUS_SLAVE_DETAIL_SUBTREE;
+    public static final String BACKPRESSURE_SUBTREE;
 
     static {
         ASSIGNMENTS_SUBTREE = ZK_SEPERATOR + ASSIGNMENTS_ROOT;
@@ -84,6 +87,8 @@ public class Cluster {
         MASTER_SUBTREE = ZK_SEPERATOR + MASTER_ROOT;
         NIMBUS_SLAVE_SUBTREE = ZK_SEPERATOR + NIMBUS_SLAVE_ROOT;
         METRIC_SUBTREE = ZK_SEPERATOR + METRIC_ROOT;
+        NIMBUS_SLAVE_DETAIL_SUBTREE = ZK_SEPERATOR + NIMBUS_SLAVE_DETAIL_ROOT;
+        BACKPRESSURE_SUBTREE = ZK_SEPERATOR + BACKPRESSURE_ROOT;
     }
 
     public static String supervisor_path(String id) {
@@ -106,10 +111,6 @@ public class Cluster {
         return TASKBEATS_SUBTREE + ZK_SEPERATOR + topology_id;
     }
 
-    public static String taskbeat_path(String topology_id, int task_id) {
-        return taskbeat_storm_root(topology_id) + ZK_SEPERATOR + task_id;
-    }
-
     public static String taskerror_storm_root(String topology_id) {
         return TASKERRORS_SUBTREE + ZK_SEPERATOR + topology_id;
     }
@@ -130,97 +131,71 @@ public class Cluster {
         return ASSIGNMENTS_BAK_SUBTREE + ZK_SEPERATOR + id;
     }
 
+    public static String backpressure_path(String topology_id) {
+        return BACKPRESSURE_SUBTREE + ZK_SEPERATOR + topology_id;
+    }
+
     @SuppressWarnings("rawtypes")
-    public static StormClusterState mk_storm_cluster_state(
-            Map cluster_state_spec) throws Exception {
+    public static StormClusterState mk_storm_cluster_state(Map cluster_state_spec) throws Exception {
         return new StormZkClusterState(cluster_state_spec);
     }
 
-    public static StormClusterState mk_storm_cluster_state(
-            ClusterState cluster_state_spec) throws Exception {
+    public static StormClusterState mk_storm_cluster_state(ClusterState cluster_state_spec) throws Exception {
         return new StormZkClusterState(cluster_state_spec);
     }
 
-    public static Map<Integer, TaskInfo> get_all_taskInfo(
-            StormClusterState zkCluster, String topologyId) throws Exception {
-        return  zkCluster.task_all_info(topologyId);
+    public static Map<Integer, TaskInfo> get_all_taskInfo(StormClusterState zkCluster, String topologyId) throws Exception {
+        return zkCluster.task_all_info(topologyId);
     }
-    
-    
-    public static Map<Integer, String> get_all_task_component(
-    		StormClusterState zkCluster, String topologyId, 
-    		Map<Integer, TaskInfo> taskInfoMap) throws Exception {
+
+    public static Map<Integer, String> get_all_task_component(StormClusterState zkCluster, String topologyId, Map<Integer, TaskInfo> taskInfoMap)
+            throws Exception {
         if (taskInfoMap == null) {
             taskInfoMap = get_all_taskInfo(zkCluster, topologyId);
         }
-        
+
         if (taskInfoMap == null) {
             return null;
         }
-        
+
         return Common.getTaskToComponent(taskInfoMap);
     }
-    
-    public static  Map<Integer, String> get_all_task_type(
-    		StormClusterState zkCluster, String topologyId, 
-    		Map<Integer, TaskInfo> taskInfoMap) throws Exception {
+
+    public static Map<Integer, String> get_all_task_type(StormClusterState zkCluster, String topologyId, Map<Integer, TaskInfo> taskInfoMap) throws Exception {
         if (taskInfoMap == null) {
             taskInfoMap = get_all_taskInfo(zkCluster, topologyId);
         }
-        
+
         if (taskInfoMap == null) {
             return null;
         }
-        
-        return Common.getTaskToType(taskInfoMap);
-    }
-
-    public static Map<String, TaskHeartbeat> get_all_task_heartbeat(
-            StormClusterState zkCluster, String topologyId) throws Exception {
-        Map<String, TaskHeartbeat> ret = new HashMap<String, TaskHeartbeat>();
-
-        List<String> taskList = zkCluster.heartbeat_tasks(topologyId);
-        for (String taskId : taskList) {
-            TaskHeartbeat hb =
-                    zkCluster.task_heartbeat(topologyId,
-                            Integer.valueOf(taskId));
-            if (hb == null) {
-                LOG.error("Failed to get hearbeat of " + topologyId + ":"
-                        + taskId);
-                continue;
-            }
 
-            ret.put(taskId, hb);
-        }
-
-        return ret;
+        return Common.getTaskToType(taskInfoMap);
     }
 
     /**
-     * if one topology's name equal the input storm_name, then return the
-     * topology id, otherwise return null
+     * if one topology's name equal the input storm_name, then return the topology id, otherwise return null
      * 
      * @param zkCluster
      * @param storm_name
      * @return
      * @throws Exception
      */
-    public static String get_topology_id(StormClusterState zkCluster,
-            String storm_name) throws Exception {
+    public static String get_topology_id(StormClusterState zkCluster, String storm_name) throws Exception {
         List<String> active_storms = zkCluster.active_storms();
         String rtn = null;
         if (active_storms != null) {
             for (String topology_id : active_storms) {
-                
+
                 if (topology_id.indexOf(storm_name) < 0) {
                     continue;
                 }
-
-                String zkTopologyName = Common.topologyIdToName(topology_id);
-                if (storm_name.endsWith(zkTopologyName)) {
-                    return topology_id;
+                StormBase base = zkCluster.storm_base(topology_id, null);
+                if (base != null && storm_name.equals(Common.getTopologyNameById(topology_id))) {
+                    rtn = topology_id;
+                    break;
                 }
-                
+
             }
         }
         return rtn;
@@ -233,8 +208,7 @@ public class Cluster {
      * @return <topology_id, StormBase>
      * @throws Exception
      */
-    public static HashMap<String, StormBase> get_all_StormBase(
-            StormClusterState zkCluster) throws Exception {
+    public static HashMap<String, StormBase> get_all_StormBase(StormClusterState zkCluster) throws Exception {
         HashMap<String, StormBase> rtn = new HashMap<String, StormBase>();
         List<String> active_storms = zkCluster.active_storms();
         if (active_storms != null) {
@@ -253,25 +227,20 @@ public class Cluster {
      * 
      * @param stormClusterState
      * @param callback
-     * @return Map<String, SupervisorInfo> String: supervisorId SupervisorInfo:
-     *         [time-secs hostname worker-ports uptime-secs]
+     * @return Map<String, SupervisorInfo> String: supervisorId SupervisorInfo: [time-secs hostname worker-ports uptime-secs]
      * @throws Exception
      */
-    public static Map<String, SupervisorInfo> get_all_SupervisorInfo(
-            StormClusterState stormClusterState, RunnableCallback callback)
-            throws Exception {
+    public static Map<String, SupervisorInfo> get_all_SupervisorInfo(StormClusterState stormClusterState, RunnableCallback callback) throws Exception {
 
         Map<String, SupervisorInfo> rtn = new TreeMap<String, SupervisorInfo>();
         // get /ZK/supervisors
         List<String> supervisorIds = stormClusterState.supervisors(callback);
         if (supervisorIds != null) {
-            for (Iterator<String> iter = supervisorIds.iterator(); iter
-                    .hasNext();) {
+            for (Iterator<String> iter = supervisorIds.iterator(); iter.hasNext();) {
 
                 String supervisorId = iter.next();
                 // get /supervisors/supervisorid
-                SupervisorInfo supervisorInfo =
-                        stormClusterState.supervisor_info(supervisorId);
+                SupervisorInfo supervisorInfo = stormClusterState.supervisor_info(supervisorId);
                 if (supervisorInfo == null) {
                     LOG.warn("Failed to get SupervisorInfo of " + supervisorId);
                 } else {
@@ -286,9 +255,7 @@ public class Cluster {
         return rtn;
     }
 
-    public static Map<String, Assignment> get_all_assignment(
-            StormClusterState stormClusterState, RunnableCallback callback)
-            throws Exception {
+    public static Map<String, Assignment> get_all_assignment(StormClusterState stormClusterState, RunnableCallback callback) throws Exception {
         Map<String, Assignment> ret = new HashMap<String, Assignment>();
 
         // get /assignments {topology_id}
@@ -300,12 +267,10 @@ public class Cluster {
 
         for (String topology_id : assignments) {
 
-            Assignment assignment =
-                    stormClusterState.assignment_info(topology_id, callback);
+            Assignment assignment = stormClusterState.assignment_info(topology_id, callback);
 
             if (assignment == null) {
-                LOG.error("Failed to get Assignment of " + topology_id
-                        + " from ZK");
+                LOG.error("Failed to get Assignment of " + topology_id + " from ZK");
                 continue;
             }
 
@@ -315,8 +280,7 @@ public class Cluster {
         return ret;
     }
 
-    public static Map<String, String> get_all_nimbus_slave(
-            StormClusterState stormClusterState) throws Exception {
+    public static Map<String, String> get_all_nimbus_slave(StormClusterState stormClusterState) throws Exception {
         List<String> hosts = stormClusterState.get_nimbus_slaves();
         if (hosts == null || hosts.size() == 0) {
             return null;
@@ -331,11 +295,8 @@ public class Cluster {
         return ret;
     }
 
-    public static String get_supervisor_hostname(
-            StormClusterState stormClusterState, String supervisorId)
-            throws Exception {
-        SupervisorInfo supervisorInfo =
-                stormClusterState.supervisor_info(supervisorId);
+    public static String get_supervisor_hostname(StormClusterState stormClusterState, String supervisorId) throws Exception {
+        SupervisorInfo supervisorInfo = stormClusterState.supervisor_info(supervisorId);
         if (supervisorInfo == null) {
             return null;
         } else {
@@ -343,12 +304,9 @@ public class Cluster {
         }
     }
 
-    public static boolean is_topology_exist_error(
-            StormClusterState stormClusterState, String topologyId)
-            throws Exception {
+    public static boolean is_topology_exist_error(StormClusterState stormClusterState, String topologyId) throws Exception {
 
-        Map<Integer, String> lastErrMap =
-                stormClusterState.topo_lastErr_time(topologyId);
+        Map<Integer, String> lastErrMap = stormClusterState.topo_lastErr_time(topologyId);
         if (lastErrMap == null || lastErrMap.size() == 0) {
             return false;
         }
@@ -365,34 +323,33 @@ public class Cluster {
 
         return false;
     }
-    
-	public static Map<Integer, List<TaskError>> get_all_task_errors(
-			StormClusterState stormClusterState, String topologyId) {
-		Map<Integer, List<TaskError>> ret = new HashMap<Integer, List<TaskError>>();
-		try {
-			List<String> errorTasks = stormClusterState.task_error_ids(topologyId);
-			if (errorTasks == null || errorTasks.size() == 0) {
-				return ret;
-			}
-
-			for (String taskIdStr : errorTasks) {
-				Integer taskId = -1;
-				try {
-					taskId = Integer.valueOf(taskIdStr);
-				}catch(Exception e) {
-					// skip last_error
-					continue;
-				}
-				
-				List<TaskError> taskErrorList = stormClusterState.task_errors(topologyId, taskId);
-				ret.put(taskId, taskErrorList);
-			}
-			return ret;
-		} catch (Exception e) {
-			// TODO Auto-generated catch block
-			return ret;
-		}
-
-	}
+
+    public static Map<Integer, List<TaskError>> get_all_task_errors(StormClusterState stormClusterState, String topologyId) {
+        Map<Integer, List<TaskError>> ret = new HashMap<Integer, List<TaskError>>();
+        try {
+            List<String> errorTasks = stormClusterState.task_error_ids(topologyId);
+            if (errorTasks == null || errorTasks.size() == 0) {
+                return ret;
+            }
+
+            for (String taskIdStr : errorTasks) {
+                Integer taskId = -1;
+                try {
+                    taskId = Integer.valueOf(taskIdStr);
+                } catch (Exception e) {
+                    // skip last_error
+                    continue;
+                }
+
+                List<TaskError> taskErrorList = stormClusterState.task_errors(topologyId, taskId);
+                ret.put(taskId, taskErrorList);
+            }
+            return ret;
+        } catch (Exception e) {
+            // TODO Auto-generated catch block
+            return ret;
+        }
+
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/ClusterState.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/ClusterState.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/ClusterState.java
index 8cba073..ad88717 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/ClusterState.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/ClusterState.java
@@ -39,8 +39,7 @@ public interface ClusterState {
 
     public byte[] get_data_sync(String path, boolean watch) throws Exception;
 
-    public List<String> get_children(String path, boolean watch)
-            throws Exception;
+    public List<String> get_children(String path, boolean watch) throws Exception;
 
     public void mkdirs(String path) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Common.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Common.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Common.java
index a9e3e0b..48528d7 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Common.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/Common.java
@@ -17,35 +17,9 @@
  */
 package com.alibaba.jstorm.cluster;
 
-import java.io.IOException;
-import java.net.URLClassLoader;
-import java.security.InvalidParameterException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-
 import backtype.storm.Config;
 import backtype.storm.Constants;
-import backtype.storm.generated.Bolt;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.ComponentObject;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.JavaObject;
-import backtype.storm.generated.ShellComponent;
-import backtype.storm.generated.SpoutSpec;
-import backtype.storm.generated.StateSpoutSpec;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.StreamInfo;
+import backtype.storm.generated.*;
 import backtype.storm.metric.SystemBolt;
 import backtype.storm.spout.ShellSpout;
 import backtype.storm.task.IBolt;
@@ -54,17 +28,23 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.ThriftTopologyUtils;
 import backtype.storm.utils.Utils;
-
 import com.alibaba.jstorm.daemon.worker.WorkerData;
 import com.alibaba.jstorm.schedule.default_assign.DefaultTopologyAssignContext;
-import com.alibaba.jstorm.task.Task;
 import com.alibaba.jstorm.task.TaskInfo;
 import com.alibaba.jstorm.task.acker.Acker;
 import com.alibaba.jstorm.task.group.MkGrouper;
+import com.alibaba.jstorm.task.master.TopologyMaster;
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.Thrift;
 import com.alibaba.jstorm.utils.TimeUtils;
 import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URLClassLoader;
+import java.security.InvalidParameterException;
+import java.util.*;
+import java.util.Map.Entry;
 
 /**
  * Base utility function
@@ -75,14 +55,17 @@ import com.google.common.collect.Maps;
  * 
  */
 public class Common {
-    private final static Logger LOG = Logger.getLogger(Common.class);
+    private final static Logger LOG = LoggerFactory.getLogger(Common.class);
+
+    public static final String TOPOLOGY_MASTER_COMPONENT_ID = "__topology_master";
+    public static final String TOPOLOGY_MASTER_HB_STREAM_ID = "__master_task_heartbeat";
+    public static final String TOPOLOGY_MASTER_METRICS_STREAM_ID = "__master_metrics";
+    public static final String TOPOLOGY_MASTER_CONTROL_STREAM_ID = "__master_control_stream";
 
     public static final String ACKER_COMPONENT_ID = Acker.ACKER_COMPONENT_ID;
-    public static final String ACKER_INIT_STREAM_ID =
-            Acker.ACKER_INIT_STREAM_ID;
+    public static final String ACKER_INIT_STREAM_ID = Acker.ACKER_INIT_STREAM_ID;
     public static final String ACKER_ACK_STREAM_ID = Acker.ACKER_ACK_STREAM_ID;
-    public static final String ACKER_FAIL_STREAM_ID =
-            Acker.ACKER_FAIL_STREAM_ID;
+    public static final String ACKER_FAIL_STREAM_ID = Acker.ACKER_FAIL_STREAM_ID;
 
     public static final String SYSTEM_STREAM_ID = "__system";
 
@@ -92,24 +75,20 @@ public class Common {
     public static final String LS_APPROVED_WORKERS = "approved-workers";
     public static final String LS_TASK_CLEANUP_TIMEOUT = "task-cleanup-timeout";
 
-    public static final String compErrorInfo =
-            "ID can only contains a-z, A-Z, 0-9, '-', '_', '.', '$', and should not start with \"__\".";
-    public static final String nameErrorInfo =
-            "Name can only contains a-z, A-Z, 0-9, '-', '_', '.'";
+    public static final String compErrorInfo = "ID can only contains a-z, A-Z, 0-9, '-', '_', '.', '$', and should not start with \"__\".";
+    public static final String nameErrorInfo = "Name can only contains a-z, A-Z, 0-9, '-', '_', '.'";
 
     public static boolean system_id(String id) {
         return Utils.isSystemId(id);
     }
 
-    private static void validate_component(Object obj)
-            throws InvalidTopologyException {
+    private static void validate_component(Object obj) throws InvalidTopologyException {
 
         if (obj instanceof StateSpoutSpec) {
             StateSpoutSpec spec = (StateSpoutSpec) obj;
             for (String id : spec.get_common().get_streams().keySet()) {
                 if (system_id(id) || !charComponentValidate(id)) {
-                    throw new InvalidTopologyException(id
-                            + " is not a valid component id. " + compErrorInfo);
+                    throw new InvalidTopologyException(id + " is not a valid component id. " + compErrorInfo);
                 }
             }
 
@@ -117,16 +96,14 @@ public class Common {
             SpoutSpec spec = (SpoutSpec) obj;
             for (String id : spec.get_common().get_streams().keySet()) {
                 if (system_id(id) || !charComponentValidate(id)) {
-                    throw new InvalidTopologyException(id
-                            + " is not a valid component id. " + compErrorInfo);
+                    throw new InvalidTopologyException(id + " is not a valid component id. " + compErrorInfo);
                 }
             }
         } else if (obj instanceof Bolt) {
             Bolt spec = (Bolt) obj;
             for (String id : spec.get_common().get_streams().keySet()) {
                 if (system_id(id) || !charComponentValidate(id)) {
-                    throw new InvalidTopologyException(id
-                            + " is not a valid component id. " + compErrorInfo);
+                    throw new InvalidTopologyException(id + " is not a valid component id. " + compErrorInfo);
                 }
             }
         } else {
@@ -136,8 +113,7 @@ public class Common {
     }
 
     public static String topologyNameToId(String topologyName, int counter) {
-        return topologyName + "-" + counter + "-"
-                + TimeUtils.current_time_secs();
+        return topologyName + "-" + counter + "-" + TimeUtils.current_time_secs();
     }
 
     public static String getTopologyNameById(String topologyId) {
@@ -151,14 +127,12 @@ public class Common {
     }
 
     /**
-     * Convert topologyId to topologyName. TopologyId =
-     * topoloygName-counter-timeStamp
+     * Convert topologyId to topologyName. TopologyId = topoloygName-counter-timeStamp
      * 
      * @param topologyId
      * @return
      */
-    public static String topologyIdToName(String topologyId)
-            throws InvalidTopologyException {
+    public static String topologyIdToName(String topologyId) throws InvalidTopologyException {
         String ret = null;
         int index = topologyId.lastIndexOf('-');
         if (index != -1 && index > 2) {
@@ -166,17 +140,14 @@ public class Common {
             if (index != -1 && index > 0)
                 ret = topologyId.substring(0, index);
             else
-                throw new InvalidTopologyException(topologyId
-                        + " is not a valid topologyId");
+                throw new InvalidTopologyException(topologyId + " is not a valid topologyId");
         } else
-            throw new InvalidTopologyException(topologyId
-                    + " is not a valid topologyId");
+            throw new InvalidTopologyException(topologyId + " is not a valid topologyId");
         return ret;
     }
 
     /**
-     * Validation of topology name chars. Only alpha char, number, '-', '_', '.'
-     * are valid.
+     * Validation of topology name chars. Only alpha char, number, '-', '_', '.' are valid.
      * 
      * @return
      */
@@ -185,8 +156,7 @@ public class Common {
     }
 
     /**
-     * Validation of topology component chars. Only alpha char, number, '-',
-     * '_', '.', '$' are valid.
+     * Validation of topology component chars. Only alpha char, number, '-', '_', '.', '$' are valid.
      * 
      * @return
      */
@@ -201,12 +171,10 @@ public class Common {
      * @throws InvalidTopologyException
      */
     @SuppressWarnings("unchecked")
-    public static void validate_ids(StormTopology topology, String topologyId)
-            throws InvalidTopologyException {
+    public static void validate_ids(StormTopology topology, String topologyId) throws InvalidTopologyException {
         String topologyName = topologyIdToName(topologyId);
         if (!charValidate(topologyName)) {
-            throw new InvalidTopologyException(topologyName
-                    + " is not a valid topology name. " + nameErrorInfo);
+            throw new InvalidTopologyException(topologyName + " is not a valid topology name. " + nameErrorInfo);
         }
 
         List<String> list = new ArrayList<String>();
@@ -220,9 +188,7 @@ public class Common {
 
                 for (String id : commids) {
                     if (system_id(id) || !charComponentValidate(id)) {
-                        throw new InvalidTopologyException(id
-                                + " is not a valid component id. "
-                                + compErrorInfo);
+                        throw new InvalidTopologyException(id + " is not a valid component id. " + compErrorInfo);
                     }
                 }
 
@@ -236,19 +202,16 @@ public class Common {
 
         List<String> offending = JStormUtils.getRepeat(list);
         if (offending.isEmpty() == false) {
-            throw new InvalidTopologyException("Duplicate component ids: "
-                    + offending);
+            throw new InvalidTopologyException("Duplicate component ids: " + offending);
         }
 
     }
 
-    private static void validate_component_inputs(Object obj)
-            throws InvalidTopologyException {
+    private static void validate_component_inputs(Object obj) throws InvalidTopologyException {
         if (obj instanceof StateSpoutSpec) {
             StateSpoutSpec spec = (StateSpoutSpec) obj;
             if (!spec.get_common().get_inputs().isEmpty()) {
-                throw new InvalidTopologyException(
-                        "May not declare inputs for a spout");
+                throw new InvalidTopologyException("May not declare inputs for a spout");
             }
 
         }
@@ -256,22 +219,18 @@ public class Common {
         if (obj instanceof SpoutSpec) {
             SpoutSpec spec = (SpoutSpec) obj;
             if (!spec.get_common().get_inputs().isEmpty()) {
-                throw new InvalidTopologyException(
-                        "May not declare inputs for a spout");
+                throw new InvalidTopologyException("May not declare inputs for a spout");
             }
         }
     }
 
     /**
-     * Validate the topology 1. component id name is valid or not 2. check some
-     * spout's input is empty or not
+     * Validate the topology 1. component id name is valid or not 2. check some spout's input is empty or not
      * 
      * @param topology
      * @throws InvalidTopologyException
      */
-    public static void validate_basic(StormTopology topology,
-            Map<Object, Object> totalStormConf, String topologyid)
-            throws InvalidTopologyException {
+    public static void validate_basic(StormTopology topology, Map<Object, Object> totalStormConf, String topologyid) throws InvalidTopologyException {
         validate_ids(topology, topologyid);
 
         for (StormTopology._Fields field : Thrift.SPOUT_FIELDS) {
@@ -285,23 +244,15 @@ public class Common {
 
         }
 
-        Integer workerNum =
-                JStormUtils.parseInt(totalStormConf
-                        .get(Config.TOPOLOGY_WORKERS));
+        Integer workerNum = JStormUtils.parseInt(totalStormConf.get(Config.TOPOLOGY_WORKERS));
         if (workerNum == null || workerNum <= 0) {
-            String errMsg =
-                    "There are no Config.TOPOLOGY_WORKERS in configuration of "
-                            + topologyid;
+            String errMsg = "There are no Config.TOPOLOGY_WORKERS in configuration of " + topologyid;
             throw new InvalidParameterException(errMsg);
         }
 
-        Integer ackerNum =
-                JStormUtils.parseInt(totalStormConf
-                        .get(Config.TOPOLOGY_ACKER_EXECUTORS));
+        Integer ackerNum = JStormUtils.parseInt(totalStormConf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
         if (ackerNum != null && ackerNum < 0) {
-            String errMsg =
-                    "Invalide Config.TOPOLOGY_ACKERS in configuration of "
-                            + topologyid;
+            String errMsg = "Invalide Config.TOPOLOGY_ACKERS in configuration of " + topologyid;
             throw new InvalidParameterException(errMsg);
         }
 
@@ -310,23 +261,139 @@ public class Common {
     /**
      * Generate acker's input Map<GlobalStreamId, Grouping>
      * 
-     * for spout <GlobalStreamId(spoutId, ACKER_INIT_STREAM_ID), ...> for bolt
-     * <GlobalStreamId(boltId, ACKER_ACK_STREAM_ID), ...>
-     * <GlobalStreamId(boltId, ACKER_FAIL_STREAM_ID), ...>
+     * for spout <GlobalStreamId(spoutId, ACKER_INIT_STREAM_ID), ...> for bolt <GlobalStreamId(boltId, ACKER_ACK_STREAM_ID), ...> <GlobalStreamId(boltId,
+     * ACKER_FAIL_STREAM_ID), ...>
+     * 
+     * @param topology
+     * @return
+     */
+    public static Map<GlobalStreamId, Grouping> topoMasterInputs(StormTopology topology) {
+        GlobalStreamId stream = null;
+        Grouping group = null;
+
+        Map<GlobalStreamId, Grouping> spout_inputs = new HashMap<GlobalStreamId, Grouping>();
+        Map<String, SpoutSpec> spout_ids = topology.get_spouts();
+        for (Entry<String, SpoutSpec> spout : spout_ids.entrySet()) {
+            String id = spout.getKey();
+
+            stream = new GlobalStreamId(id, TOPOLOGY_MASTER_HB_STREAM_ID);
+            group = Thrift.mkAllGrouping();
+            spout_inputs.put(stream, group);
+
+            stream = new GlobalStreamId(id, TOPOLOGY_MASTER_METRICS_STREAM_ID);
+            group = Thrift.mkAllGrouping();
+            spout_inputs.put(stream, group);
+
+            stream = new GlobalStreamId(id, TOPOLOGY_MASTER_CONTROL_STREAM_ID);
+            group = Thrift.mkAllGrouping();
+            spout_inputs.put(stream, group);
+        }
+
+        Map<String, Bolt> bolt_ids = topology.get_bolts();
+        Map<GlobalStreamId, Grouping> bolt_inputs = new HashMap<GlobalStreamId, Grouping>();
+        for (Entry<String, Bolt> bolt : bolt_ids.entrySet()) {
+            String id = bolt.getKey();
+            stream = new GlobalStreamId(id, TOPOLOGY_MASTER_HB_STREAM_ID);
+            group = Thrift.mkAllGrouping();
+            bolt_inputs.put(stream, group);
+
+            stream = new GlobalStreamId(id, TOPOLOGY_MASTER_METRICS_STREAM_ID);
+            group = Thrift.mkAllGrouping();
+            bolt_inputs.put(stream, group);
+
+            stream = new GlobalStreamId(id, TOPOLOGY_MASTER_CONTROL_STREAM_ID);
+            group = Thrift.mkAllGrouping();
+            bolt_inputs.put(stream, group);
+        }
+
+        Map<GlobalStreamId, Grouping> himself_inputs = new HashMap<GlobalStreamId, Grouping>();
+        stream = new GlobalStreamId(TOPOLOGY_MASTER_COMPONENT_ID, TOPOLOGY_MASTER_HB_STREAM_ID);
+        group = Thrift.mkAllGrouping();
+        himself_inputs.put(stream, group);
+
+        stream = new GlobalStreamId(TOPOLOGY_MASTER_COMPONENT_ID, TOPOLOGY_MASTER_METRICS_STREAM_ID);
+        group = Thrift.mkAllGrouping();
+        himself_inputs.put(stream, group);
+        
+        Map<GlobalStreamId, Grouping> allInputs = new HashMap<GlobalStreamId, Grouping>();
+        allInputs.putAll(bolt_inputs);
+        allInputs.putAll(spout_inputs);
+        allInputs.putAll(himself_inputs);
+        return allInputs;
+    }
+
+    /**
+     * Add topology master bolt to topology
+     */
+    public static void addTopologyMaster(Map stormConf, StormTopology ret) {
+        // generate outputs
+        HashMap<String, StreamInfo> outputs = new HashMap<String, StreamInfo>();
+
+        List<String> list = JStormUtils.mk_list(TopologyMaster.FILED_CTRL_EVENT);
+        outputs.put(TOPOLOGY_MASTER_CONTROL_STREAM_ID, Thrift.outputFields(list));
+        list = JStormUtils.mk_list(TopologyMaster.FIELD_METRIC_WORKER, TopologyMaster.FIELD_METRIC_METRICS);
+        outputs.put(TOPOLOGY_MASTER_METRICS_STREAM_ID, Thrift.outputFields(list));
+        list = JStormUtils.mk_list(TopologyMaster.FILED_HEARBEAT_EVENT);
+        outputs.put(TOPOLOGY_MASTER_HB_STREAM_ID, Thrift.outputFields(list));
+
+        IBolt topologyMaster = new TopologyMaster();
+
+        // generate inputs
+        Map<GlobalStreamId, Grouping> inputs = topoMasterInputs(ret);
+
+        // generate topology master which will be stored in topology
+        Bolt topologyMasterBolt = Thrift.mkBolt(inputs, topologyMaster, outputs, 1);
+
+        // add output stream to spout/bolt
+        for (Entry<String, Bolt> e : ret.get_bolts().entrySet()) {
+            Bolt bolt = e.getValue();
+            ComponentCommon common = bolt.get_common();
+            List<String> fields = JStormUtils.mk_list(TopologyMaster.FIELD_METRIC_WORKER, TopologyMaster.FIELD_METRIC_METRICS);
+            common.put_to_streams(TOPOLOGY_MASTER_METRICS_STREAM_ID, Thrift.directOutputFields(fields));
+            fields = JStormUtils.mk_list(TopologyMaster.FILED_HEARBEAT_EVENT);
+            common.put_to_streams(TOPOLOGY_MASTER_HB_STREAM_ID, Thrift.directOutputFields(fields));
+            fields = JStormUtils.mk_list(TopologyMaster.FILED_CTRL_EVENT);
+            common.put_to_streams(TOPOLOGY_MASTER_CONTROL_STREAM_ID, Thrift.directOutputFields(fields));
+
+            GlobalStreamId stream = new GlobalStreamId(TOPOLOGY_MASTER_COMPONENT_ID, TOPOLOGY_MASTER_CONTROL_STREAM_ID);
+            common.put_to_inputs(stream, Thrift.mkDirectGrouping());
+            bolt.set_common(common);
+        }
+
+        for (Entry<String, SpoutSpec> kv : ret.get_spouts().entrySet()) {
+            SpoutSpec spout = kv.getValue();
+            ComponentCommon common = spout.get_common();
+            List<String> fields = JStormUtils.mk_list(TopologyMaster.FIELD_METRIC_WORKER, TopologyMaster.FIELD_METRIC_METRICS);
+            common.put_to_streams(TOPOLOGY_MASTER_METRICS_STREAM_ID, Thrift.directOutputFields(fields));
+            fields = JStormUtils.mk_list(TopologyMaster.FILED_HEARBEAT_EVENT);
+            common.put_to_streams(TOPOLOGY_MASTER_HB_STREAM_ID, Thrift.directOutputFields(fields));
+            fields = JStormUtils.mk_list(TopologyMaster.FILED_CTRL_EVENT);
+            common.put_to_streams(TOPOLOGY_MASTER_CONTROL_STREAM_ID, Thrift.directOutputFields(fields));
+
+            GlobalStreamId stream = new GlobalStreamId(TOPOLOGY_MASTER_COMPONENT_ID, TOPOLOGY_MASTER_CONTROL_STREAM_ID);
+            common.put_to_inputs(stream, Thrift.mkDirectGrouping());
+            spout.set_common(common);
+        }
+
+        ret.put_to_bolts(TOPOLOGY_MASTER_COMPONENT_ID, topologyMasterBolt);
+    }
+
+    /**
+     * Generate acker's input Map<GlobalStreamId, Grouping>
+     * 
+     * for spout <GlobalStreamId(spoutId, ACKER_INIT_STREAM_ID), ...> for bolt <GlobalStreamId(boltId, ACKER_ACK_STREAM_ID), ...> <GlobalStreamId(boltId,
+     * ACKER_FAIL_STREAM_ID), ...>
      * 
      * @param topology
      * @return
      */
-    public static Map<GlobalStreamId, Grouping> acker_inputs(
-            StormTopology topology) {
-        Map<GlobalStreamId, Grouping> spout_inputs =
-                new HashMap<GlobalStreamId, Grouping>();
+    public static Map<GlobalStreamId, Grouping> acker_inputs(StormTopology topology) {
+        Map<GlobalStreamId, Grouping> spout_inputs = new HashMap<GlobalStreamId, Grouping>();
         Map<String, SpoutSpec> spout_ids = topology.get_spouts();
         for (Entry<String, SpoutSpec> spout : spout_ids.entrySet()) {
             String id = spout.getKey();
 
-            GlobalStreamId stream =
-                    new GlobalStreamId(id, ACKER_INIT_STREAM_ID);
+            GlobalStreamId stream = new GlobalStreamId(id, ACKER_INIT_STREAM_ID);
 
             Grouping group = Thrift.mkFieldsGrouping(JStormUtils.mk_list("id"));
 
@@ -334,27 +401,21 @@ public class Common {
         }
 
         Map<String, Bolt> bolt_ids = topology.get_bolts();
-        Map<GlobalStreamId, Grouping> bolt_inputs =
-                new HashMap<GlobalStreamId, Grouping>();
+        Map<GlobalStreamId, Grouping> bolt_inputs = new HashMap<GlobalStreamId, Grouping>();
         for (Entry<String, Bolt> bolt : bolt_ids.entrySet()) {
             String id = bolt.getKey();
 
-            GlobalStreamId streamAck =
-                    new GlobalStreamId(id, ACKER_ACK_STREAM_ID);
-            Grouping groupAck =
-                    Thrift.mkFieldsGrouping(JStormUtils.mk_list("id"));
+            GlobalStreamId streamAck = new GlobalStreamId(id, ACKER_ACK_STREAM_ID);
+            Grouping groupAck = Thrift.mkFieldsGrouping(JStormUtils.mk_list("id"));
 
-            GlobalStreamId streamFail =
-                    new GlobalStreamId(id, ACKER_FAIL_STREAM_ID);
-            Grouping groupFail =
-                    Thrift.mkFieldsGrouping(JStormUtils.mk_list("id"));
+            GlobalStreamId streamFail = new GlobalStreamId(id, ACKER_FAIL_STREAM_ID);
+            Grouping groupFail = Thrift.mkFieldsGrouping(JStormUtils.mk_list("id"));
 
             bolt_inputs.put(streamAck, groupAck);
             bolt_inputs.put(streamFail, groupFail);
         }
 
-        Map<GlobalStreamId, Grouping> allInputs =
-                new HashMap<GlobalStreamId, Grouping>();
+        Map<GlobalStreamId, Grouping> allInputs = new HashMap<GlobalStreamId, Grouping>();
         allInputs.putAll(bolt_inputs);
         allInputs.putAll(spout_inputs);
         return allInputs;
@@ -397,12 +458,10 @@ public class Common {
 
             List<String> ackList = JStormUtils.mk_list("id", "ack-val");
 
-            common.put_to_streams(ACKER_ACK_STREAM_ID,
-                    Thrift.outputFields(ackList));
+            common.put_to_streams(ACKER_ACK_STREAM_ID, Thrift.outputFields(ackList));
 
             List<String> failList = JStormUtils.mk_list("id");
-            common.put_to_streams(ACKER_FAIL_STREAM_ID,
-                    Thrift.outputFields(failList));
+            common.put_to_streams(ACKER_FAIL_STREAM_ID, Thrift.outputFields(failList));
 
             bolt.set_common(common);
         }
@@ -414,17 +473,13 @@ public class Common {
         for (Entry<String, SpoutSpec> kv : ret.get_spouts().entrySet()) {
             SpoutSpec bolt = kv.getValue();
             ComponentCommon common = bolt.get_common();
-            List<String> initList =
-                    JStormUtils.mk_list("id", "init-val", "spout-task");
-            common.put_to_streams(ACKER_INIT_STREAM_ID,
-                    Thrift.outputFields(initList));
+            List<String> initList = JStormUtils.mk_list("id", "init-val", "spout-task");
+            common.put_to_streams(ACKER_INIT_STREAM_ID, Thrift.outputFields(initList));
 
-            GlobalStreamId ack_ack =
-                    new GlobalStreamId(ACKER_COMPONENT_ID, ACKER_ACK_STREAM_ID);
+            GlobalStreamId ack_ack = new GlobalStreamId(ACKER_COMPONENT_ID, ACKER_ACK_STREAM_ID);
             common.put_to_inputs(ack_ack, Thrift.mkDirectGrouping());
 
-            GlobalStreamId ack_fail =
-                    new GlobalStreamId(ACKER_COMPONENT_ID, ACKER_FAIL_STREAM_ID);
+            GlobalStreamId ack_fail = new GlobalStreamId(ACKER_COMPONENT_ID, ACKER_FAIL_STREAM_ID);
             common.put_to_inputs(ack_fail, Thrift.mkDirectGrouping());
         }
 
@@ -480,26 +535,21 @@ public class Common {
 
     public static StormTopology add_system_components(StormTopology topology) {
         // generate inputs
-        Map<GlobalStreamId, Grouping> inputs =
-                new HashMap<GlobalStreamId, Grouping>();
+        Map<GlobalStreamId, Grouping> inputs = new HashMap<GlobalStreamId, Grouping>();
 
         // generate outputs
         HashMap<String, StreamInfo> outputs = new HashMap<String, StreamInfo>();
         ArrayList<String> fields = new ArrayList<String>();
 
-        outputs.put(Constants.SYSTEM_TICK_STREAM_ID,
-                Thrift.outputFields(JStormUtils.mk_list("rate_secs")));
-        outputs.put(Constants.METRICS_TICK_STREAM_ID,
-                Thrift.outputFields(JStormUtils.mk_list("interval")));
-        outputs.put(Constants.CREDENTIALS_CHANGED_STREAM_ID,
-                Thrift.outputFields(JStormUtils.mk_list("creds")));
+        outputs.put(Constants.SYSTEM_TICK_STREAM_ID, Thrift.outputFields(JStormUtils.mk_list("rate_secs")));
+        outputs.put(Constants.METRICS_TICK_STREAM_ID, Thrift.outputFields(JStormUtils.mk_list("interval")));
+        outputs.put(Constants.CREDENTIALS_CHANGED_STREAM_ID, Thrift.outputFields(JStormUtils.mk_list("creds")));
 
         // ComponentCommon common = new ComponentCommon(inputs, outputs);
 
         IBolt ackerbolt = new SystemBolt();
 
-        Bolt bolt =
-                Thrift.mkBolt(inputs, ackerbolt, outputs, Integer.valueOf(0));
+        Bolt bolt = Thrift.mkBolt(inputs, ackerbolt, outputs, Integer.valueOf(0));
 
         topology.put_to_bolts(Constants.SYSTEM_COMPONENT_ID, bolt);
 
@@ -539,13 +589,15 @@ public class Common {
     }
 
     @SuppressWarnings("rawtypes")
-    public static StormTopology system_topology(Map storm_conf,
-            StormTopology topology) throws InvalidTopologyException {
+    public static StormTopology system_topology(Map storm_conf, StormTopology topology) throws InvalidTopologyException {
 
         StormTopology ret = topology.deepCopy();
 
         add_acker(storm_conf, ret);
 
+        if(StormConfig.local_mode(storm_conf) == false)
+            addTopologyMaster(storm_conf, ret);
+
         add_metrics_component(ret);
 
         add_system_components(ret);
@@ -562,8 +614,7 @@ public class Common {
      * @return
      */
     @SuppressWarnings("unchecked")
-    public static Map component_conf(Map storm_conf,
-            TopologyContext topology_context, String component_id) {
+    public static Map component_conf(Map storm_conf, TopologyContext topology_context, String component_id) {
         List<Object> to_remove = StormConfig.All_CONFIGS();
         to_remove.remove(Config.TOPOLOGY_DEBUG);
         to_remove.remove(Config.TOPOLOGY_MAX_SPOUT_PENDING);
@@ -572,16 +623,13 @@ public class Common {
 
         Map<Object, Object> componentConf = new HashMap<Object, Object>();
 
-        String jconf =
-                topology_context.getComponentCommon(component_id)
-                        .get_json_conf();
+        String jconf = topology_context.getComponentCommon(component_id).get_json_conf();
         if (jconf != null) {
             componentConf = (Map<Object, Object>) JStormUtils.from_json(jconf);
         }
 
         /**
-         * @@@ Don't know why need remove system configuration from component
-         *     conf? //
+         * @@@ Don't know why need remove system configuration from component conf? //
          */
         // for (Object p : to_remove) {
         // componentConf.remove(p);
@@ -601,8 +649,7 @@ public class Common {
      * @param component_id
      * @return
      */
-    public static Object get_task_object(StormTopology topology,
-            String component_id, URLClassLoader loader) {
+    public static Object get_task_object(StormTopology topology, String component_id, URLClassLoader loader) {
         Map<String, SpoutSpec> spouts = topology.get_spouts();
         Map<String, Bolt> bolts = topology.get_bolts();
         Map<String, StateSpoutSpec> state_spouts = topology.get_state_spouts();
@@ -617,8 +664,7 @@ public class Common {
         }
 
         if (obj == null) {
-            throw new RuntimeException("Could not find " + component_id
-                    + " in " + topology.toString());
+            throw new RuntimeException("Could not find " + component_id + " in " + topology.toString());
         }
 
         Object componentObject = Utils.getSetComponentObject(obj, loader);
@@ -646,43 +692,34 @@ public class Common {
      * @param topology_context
      * @return
      */
-    public static Map<String, Map<String, MkGrouper>> outbound_components(
-            TopologyContext topology_context, WorkerData workerData) {
-        Map<String, Map<String, MkGrouper>> rr =
-                new HashMap<String, Map<String, MkGrouper>>();
+    public static Map<String, Map<String, MkGrouper>> outbound_components(TopologyContext topology_context, WorkerData workerData) {
+        Map<String, Map<String, MkGrouper>> rr = new HashMap<String, Map<String, MkGrouper>>();
 
         // <Stream_id,<component,Grouping>>
-        Map<String, Map<String, Grouping>> output_groupings =
-                topology_context.getThisTargets();
+        Map<String, Map<String, Grouping>> output_groupings = topology_context.getThisTargets();
 
-        for (Entry<String, Map<String, Grouping>> entry : output_groupings
-                .entrySet()) {
+        for (Entry<String, Map<String, Grouping>> entry : output_groupings.entrySet()) {
 
             String stream_id = entry.getKey();
             Map<String, Grouping> component_grouping = entry.getValue();
 
             Fields out_fields = topology_context.getThisOutputFields(stream_id);
 
-            Map<String, MkGrouper> componentGrouper =
-                    new HashMap<String, MkGrouper>();
+            Map<String, MkGrouper> componentGrouper = new HashMap<String, MkGrouper>();
 
             for (Entry<String, Grouping> cg : component_grouping.entrySet()) {
 
                 String component = cg.getKey();
                 Grouping tgrouping = cg.getValue();
 
-                List<Integer> outTasks =
-                        topology_context.getComponentTasks(component);
+                List<Integer> outTasks = topology_context.getComponentTasks(component);
                 // ATTENTION: If topology set one component parallelism as 0
                 // so we don't need send tuple to it
                 if (outTasks.size() > 0) {
-                    MkGrouper grouper =
-                            new MkGrouper(topology_context, out_fields,
-                                    tgrouping, outTasks, stream_id, workerData);
+                    MkGrouper grouper = new MkGrouper(topology_context, out_fields, tgrouping, outTasks, stream_id, workerData);
                     componentGrouper.put(component, grouper);
                 }
-                LOG.info("outbound_components, outTasks=" + outTasks
-                        + " for task-" + topology_context.getThisTaskId());
+                LOG.info("outbound_components, outTasks=" + outTasks + " for task-" + topology_context.getThisTaskId());
             }
             if (componentGrouper.size() > 0) {
                 rr.put(stream_id, componentGrouper);
@@ -696,17 +733,12 @@ public class Common {
      * 
      * @param topology_context
      * @param task_id
-     * @return component's configurations
      */
-    public static Map getComponentMap(DefaultTopologyAssignContext context,
-            Integer task) {
+    public static Map getComponentMap(DefaultTopologyAssignContext context, Integer task) {
         String componentName = context.getTaskToComponent().get(task);
-        ComponentCommon componentCommon =
-                ThriftTopologyUtils.getComponentCommon(
-                        context.getSysTopology(), componentName);
+        ComponentCommon componentCommon = ThriftTopologyUtils.getComponentCommon(context.getSysTopology(), componentName);
 
-        Map componentMap =
-                (Map) JStormUtils.from_json(componentCommon.get_json_conf());
+        Map componentMap = (Map) JStormUtils.from_json(componentCommon.get_json_conf());
         if (componentMap == null) {
             componentMap = Maps.newHashMap();
         }
@@ -714,22 +746,17 @@ public class Common {
     }
 
     /**
-     * get all bolts' inputs and spouts' outputs <Bolt_name, <Input_name>>
-     * <Spout_name, <Output_name>>
+     * get all bolts' inputs and spouts' outputs <Bolt_name, <Input_name>> <Spout_name, <Output_name>>
      * 
      * @param topology_context
      * @return all bolts' inputs and spouts' outputs
      */
-    public static Map<String, Set<String>> buildSpoutOutoputAndBoltInputMap(
-            DefaultTopologyAssignContext context) {
+    public static Map<String, Set<String>> buildSpoutOutoputAndBoltInputMap(DefaultTopologyAssignContext context) {
         Set<String> bolts = context.getRawTopology().get_bolts().keySet();
         Set<String> spouts = context.getRawTopology().get_spouts().keySet();
-        Map<String, Set<String>> relationship =
-                new HashMap<String, Set<String>>();
-        for (Entry<String, Bolt> entry : context.getRawTopology().get_bolts()
-                .entrySet()) {
-            Map<GlobalStreamId, Grouping> inputs =
-                    entry.getValue().get_common().get_inputs();
+        Map<String, Set<String>> relationship = new HashMap<String, Set<String>>();
+        for (Entry<String, Bolt> entry : context.getRawTopology().get_bolts().entrySet()) {
+            Map<GlobalStreamId, Grouping> inputs = entry.getValue().get_common().get_inputs();
             Set<String> input = new HashSet<String>();
             relationship.put(entry.getKey(), input);
             for (Entry<GlobalStreamId, Grouping> inEntry : inputs.entrySet()) {
@@ -759,37 +786,34 @@ public class Common {
 
     public static Map<Integer, String> getTaskToComponent(Map<Integer, TaskInfo> taskInfoMap) {
         Map<Integer, String> ret = new TreeMap<Integer, String>();
-        for (Entry<Integer, TaskInfo> entry :taskInfoMap.entrySet()) {
+        for (Entry<Integer, TaskInfo> entry : taskInfoMap.entrySet()) {
             ret.put(entry.getKey(), entry.getValue().getComponentId());
         }
-        
+
         return ret;
     }
-    
+
     public static Map<Integer, String> getTaskToType(Map<Integer, TaskInfo> taskInfoMap) {
         Map<Integer, String> ret = new TreeMap<Integer, String>();
-        for (Entry<Integer, TaskInfo> entry :taskInfoMap.entrySet()) {
+        for (Entry<Integer, TaskInfo> entry : taskInfoMap.entrySet()) {
             ret.put(entry.getKey(), entry.getValue().getComponentType());
         }
-        
+
         return ret;
     }
-    
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    public static Integer mkTaskMaker(Map<Object, Object> stormConf, 
-                    Map<String, ?> cidSpec, 
-                    Map<Integer, TaskInfo> rtn, 
-                    Integer cnt) {
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static Integer mkTaskMaker(Map<Object, Object> stormConf, Map<String, ?> cidSpec, Map<Integer, TaskInfo> rtn, Integer cnt) {
         if (cidSpec == null) {
             LOG.warn("Component map is empty");
             return cnt;
         }
-        
+
         Set<?> entrySet = cidSpec.entrySet();
         for (Iterator<?> it = entrySet.iterator(); it.hasNext();) {
             Entry entry = (Entry) it.next();
             Object obj = entry.getValue();
-            
+
             ComponentCommon common = null;
             String componentType = "bolt";
             if (obj instanceof Bolt) {
@@ -802,22 +826,22 @@ public class Common {
                 common = ((StateSpoutSpec) obj).get_common();
                 componentType = "spout";
             }
-            
+
             if (common == null) {
                 throw new RuntimeException("No ComponentCommon of " + entry.getKey());
             }
-            
+
             int declared = Thrift.parallelismHint(common);
             Integer parallelism = declared;
             // Map tmp = (Map) Utils_clj.from_json(common.get_json_conf());
-            
+
             Map newStormConf = new HashMap(stormConf);
             // newStormConf.putAll(tmp);
             Integer maxParallelism = JStormUtils.parseInt(newStormConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
             if (maxParallelism != null) {
                 parallelism = Math.min(maxParallelism, declared);
             }
-            
+
             for (int i = 0; i < parallelism; i++) {
                 cnt++;
                 TaskInfo taskInfo = new TaskInfo((String) entry.getKey(), componentType);
@@ -826,20 +850,24 @@ public class Common {
         }
         return cnt;
     }
-    
-    public static Map<Integer, TaskInfo> mkTaskInfo(
-                    Map<Object, Object> stormConf, 
-                    StormTopology sysTopology, 
-                    String topologyid) {
-        
+
+    public static Map<Integer, TaskInfo> mkTaskInfo(Map<Object, Object> stormConf, StormTopology sysTopology, String topologyid) {
+
         // use TreeMap to make task as sequence
         Map<Integer, TaskInfo> rtn = new TreeMap<Integer, TaskInfo>();
-        
+
         Integer count = 0;
         count = mkTaskMaker(stormConf, sysTopology.get_bolts(), rtn, count);
         count = mkTaskMaker(stormConf, sysTopology.get_spouts(), rtn, count);
         count = mkTaskMaker(stormConf, sysTopology.get_state_spouts(), rtn, count);
-        
+
         return rtn;
     }
+
+    public static boolean isSystemComponent(String componentId) {
+        if (componentId.equals(Acker.ACKER_COMPONENT_ID) || componentId.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) {
+            return true;
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/DistributedClusterState.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/DistributedClusterState.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/DistributedClusterState.java
index 2ebce83..3d25a25 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/DistributedClusterState.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/DistributedClusterState.java
@@ -49,8 +49,7 @@ import com.alibaba.jstorm.zk.Zookeeper;
  */
 public class DistributedClusterState implements ClusterState {
 
-    private static Logger LOG = LoggerFactory
-            .getLogger(DistributedClusterState.class);
+    private static Logger LOG = LoggerFactory.getLogger(DistributedClusterState.class);
 
     private Zookeeper zkobj = new Zookeeper();
     private CuratorFramework zk;
@@ -59,8 +58,7 @@ public class DistributedClusterState implements ClusterState {
     /**
      * why run all callbacks, when receive one event
      */
-    private ConcurrentHashMap<UUID, ClusterStateCallback> callbacks =
-            new ConcurrentHashMap<UUID, ClusterStateCallback>();
+    private ConcurrentHashMap<UUID, ClusterStateCallback> callbacks = new ConcurrentHashMap<UUID, ClusterStateCallback>();
 
     private Map<Object, Object> conf;
     private AtomicBoolean active;
@@ -83,16 +81,13 @@ public class DistributedClusterState implements ClusterState {
             public void execute(KeeperState state, EventType type, String path) {
                 if (active.get()) {
                     if (!(state.equals(KeeperState.SyncConnected))) {
-                        LOG.warn("Received event " + state + ":" + type + ":"
-                                + path + " with disconnected Zookeeper.");
+                        LOG.warn("Received event " + state + ":" + type + ":" + path + " with disconnected Zookeeper.");
                     } else {
-                        LOG.info("Received event " + state + ":" + type + ":"
-                                + path);
+                        LOG.info("Received event " + state + ":" + type + ":" + path);
                     }
 
                     if (!type.equals(EventType.None)) {
-                        for (Entry<UUID, ClusterStateCallback> e : callbacks
-                                .entrySet()) {
+                        for (Entry<UUID, ClusterStateCallback> e : callbacks.entrySet()) {
                             ClusterStateCallback fn = e.getValue();
                             fn.execute(type, path);
                         }
@@ -107,17 +102,12 @@ public class DistributedClusterState implements ClusterState {
 
     @SuppressWarnings("unchecked")
     private CuratorFramework mkZk() throws IOException {
-        return zkobj.mkClient(conf,
-                (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS),
-                conf.get(Config.STORM_ZOOKEEPER_PORT), "");
+        return zkobj.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT), "");
     }
 
     @SuppressWarnings("unchecked")
-    private CuratorFramework mkZk(WatcherCallBack watcher)
-            throws NumberFormatException, IOException {
-        return zkobj.mkClient(conf,
-                (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS),
-                conf.get(Config.STORM_ZOOKEEPER_PORT),
+    private CuratorFramework mkZk(WatcherCallBack watcher) throws NumberFormatException, IOException {
+        return zkobj.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT),
                 String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)), watcher);
     }
 
@@ -136,8 +126,7 @@ public class DistributedClusterState implements ClusterState {
     }
 
     @Override
-    public List<String> get_children(String path, boolean watch)
-            throws Exception {
+    public List<String> get_children(String path, boolean watch) throws Exception {
         return zkobj.getChildren(zk, path, watch);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormBase.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormBase.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormBase.java
index e6438dd..6923ab5 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormBase.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormBase.java
@@ -37,8 +37,7 @@ public class StormBase implements Serializable {
     private boolean enableMonitor = true;
     private String group;
 
-    public StormBase(String stormName, int lanchTimeSecs, StormStatus status,
-            String group) {
+    public StormBase(String stormName, int lanchTimeSecs, StormStatus status, String group) {
         this.stormName = stormName;
         this.lanchTimeSecs = lanchTimeSecs;
         this.status = status;
@@ -98,9 +97,7 @@ public class StormBase implements Serializable {
         result = prime * result + ((group == null) ? 0 : group.hashCode());
         result = prime * result + lanchTimeSecs;
         result = prime * result + ((status == null) ? 0 : status.hashCode());
-        result =
-                prime * result
-                        + ((stormName == null) ? 0 : stormName.hashCode());
+        result = prime * result + ((stormName == null) ? 0 : stormName.hashCode());
         return result;
     }
 
@@ -137,8 +134,7 @@ public class StormBase implements Serializable {
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormClusterState.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormClusterState.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormClusterState.java
index 6486d5e..a399bb9 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormClusterState.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormClusterState.java
@@ -27,7 +27,10 @@ import com.alibaba.jstorm.schedule.Assignment;
 import com.alibaba.jstorm.schedule.AssignmentBak;
 import com.alibaba.jstorm.task.TaskInfo;
 import com.alibaba.jstorm.task.error.TaskError;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeat;
+import com.alibaba.jstorm.task.backpressure.SourceBackpressureInfo;
+import com.alibaba.jstorm.utils.Pair;
+
+import backtype.storm.generated.TopologyTaskHbInfo;
 
 /**
  * all storm in zk operation interface
@@ -41,30 +44,23 @@ public interface StormClusterState {
 
     public List<String> assignments(RunnableCallback callback) throws Exception;
 
-    public Assignment assignment_info(String topology_id,
-            RunnableCallback callback) throws Exception;
+    public Assignment assignment_info(String topology_id, RunnableCallback callback) throws Exception;
 
-    public void set_assignment(String topology_id, Assignment info)
-            throws Exception;
+    public void set_assignment(String topology_id, Assignment info) throws Exception;
 
     public AssignmentBak assignment_bak(String topologyName) throws Exception;
 
-    public void backup_assignment(String topology_id, AssignmentBak info)
-            throws Exception;
+    public void backup_assignment(String topology_id, AssignmentBak info) throws Exception;
 
     public List<String> active_storms() throws Exception;
 
-    public StormBase storm_base(String topology_id, RunnableCallback callback)
-            throws Exception;
+    public StormBase storm_base(String topology_id, RunnableCallback callback) throws Exception;
 
-    public void activate_storm(String topology_id, StormBase storm_base)
-            throws Exception;
+    public void activate_storm(String topology_id, StormBase storm_base) throws Exception;
 
-    public void update_storm(String topology_id, StormStatus new_elems)
-            throws Exception;
+    public void update_storm(String topology_id, StormStatus new_elems) throws Exception;
 
-    public void set_storm_monitor(String topologyId, boolean isEnable)
-            throws Exception;
+    public void set_storm_monitor(String topologyId, boolean isEnable) throws Exception;
 
     public void remove_storm_base(String topology_id) throws Exception;
 
@@ -72,73 +68,53 @@ public interface StormClusterState {
 
     public Set<Integer> task_ids(String topology_id) throws Exception;
 
-    public Set<Integer> task_ids_by_componentId(String topologyId,
-            String componentId) throws Exception;
+    public Set<Integer> task_ids_by_componentId(String topologyId, String componentId) throws Exception;
 
     public void set_task(String topologyId, Map<Integer, TaskInfo> taskInfoMap) throws Exception;
-    public void add_task(String topology_id, Map<Integer, TaskInfo> taskInfoMap)
-            throws Exception;
+
+    public void add_task(String topology_id, Map<Integer, TaskInfo> taskInfoMap) throws Exception;
 
     public void remove_task(String topologyId, Set<Integer> taskIds) throws Exception;
 
     public Map<Integer, TaskInfo> task_all_info(String topology_id) throws Exception;
 
-    public void setup_heartbeats(String topology_id) throws Exception;
-
     public List<String> heartbeat_storms() throws Exception;
 
-    public List<String> heartbeat_tasks(String topology_id) throws Exception;
-
-    public TaskHeartbeat task_heartbeat(String topology_id, int task_id)
-            throws Exception;
+    public void topology_heartbeat(String topology_id, TopologyTaskHbInfo info) throws Exception;
 
-    public void task_heartbeat(String topology_id, int task_id,
-            TaskHeartbeat info) throws Exception;
+    public TopologyTaskHbInfo topology_heartbeat(String topologyId) throws Exception;
 
     public void teardown_heartbeats(String topology_id) throws Exception;
 
-    public void remove_task_heartbeat(String topology_id, int task_id)
-            throws Exception;
-
     public List<String> task_error_storms() throws Exception;
-    
+
     public List<String> task_error_ids(String topologyId) throws Exception;
 
-    public void report_task_error(String topology_id, int task_id,
-            Throwable error) throws Exception;
+    public void report_task_error(String topology_id, int task_id, Throwable error) throws Exception;
 
-    public void report_task_error(String topology_id, int task_id, String error)
-            throws Exception;
+    public void report_task_error(String topology_id, int task_id, String error, String tag) throws Exception;
 
-    public Map<Integer, String> topo_lastErr_time(String topologyId)
-            throws Exception;
+    public Map<Integer, String> topo_lastErr_time(String topologyId) throws Exception;
 
     public void remove_lastErr_time(String topologyId) throws Exception;
 
-    public List<TaskError> task_errors(String topology_id, int task_id)
-            throws Exception;
+    public List<TaskError> task_errors(String topology_id, int task_id) throws Exception;
 
-    public void remove_task_error(String topologyId, int taskId)
-            throws Exception;
+    public void remove_task_error(String topologyId, int taskId) throws Exception;
 
-    public List<String> task_error_time(String topologyId, int taskId)
-            throws Exception;
+    public List<String> task_error_time(String topologyId, int taskId) throws Exception;
 
-    public String task_error_info(String topologyId, int taskId, long timeStamp)
-            throws Exception;
+    public String task_error_info(String topologyId, int taskId, long timeStamp) throws Exception;
 
     public void teardown_task_errors(String topology_id) throws Exception;
 
     public List<String> supervisors(RunnableCallback callback) throws Exception;
 
-    public SupervisorInfo supervisor_info(String supervisor_id)
-            throws Exception;
+    public SupervisorInfo supervisor_info(String supervisor_id) throws Exception;
 
-    public void supervisor_heartbeat(String supervisor_id, SupervisorInfo info)
-            throws Exception;
+    public void supervisor_heartbeat(String supervisor_id, SupervisorInfo info) throws Exception;
 
-    public boolean try_to_be_leader(String path, String host,
-            RunnableCallback callback) throws Exception;
+    public boolean try_to_be_leader(String path, String host, RunnableCallback callback) throws Exception;
 
     public String get_leader_host() throws Exception;
 
@@ -152,11 +128,25 @@ public interface StormClusterState {
 
     public void unregister_nimbus_host(String host) throws Exception;
 
-    public void set_topology_metric(String topologyId, Object metric)
-            throws Exception;
+    public void update_nimbus_detail(String hostPort, Map map) throws Exception;
+
+    public Map get_nimbus_detail(String hostPort, boolean watch) throws Exception;
+
+    public void unregister_nimbus_detail(String hostPort) throws Exception;
+
+    public void set_topology_metric(String topologyId, Object metric) throws Exception;
 
     public Object get_topology_metric(String topologyId) throws Exception;
-    
+
     public List<String> get_metrics() throws Exception;
 
+    public List<String> list_dirs(String path, boolean watch) throws  Exception;
+
+    public List<String> backpressureInfos() throws Exception;
+
+    public void set_backpressure_info(String topologyId, Map<String, SourceBackpressureInfo> sourceToBackpressureInfo) throws Exception;
+    
+    public Map<String, SourceBackpressureInfo> get_backpressure_info(String topologyId) throws Exception;
+
+    public void teardown_backpressure(String topologyId) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormConfig.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormConfig.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormConfig.java
index 3d1cd29..f78f52a 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormConfig.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormConfig.java
@@ -17,8 +17,18 @@
  */
 package com.alibaba.jstorm.cluster;
 
+import backtype.storm.Config;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.utils.LocalState;
+import backtype.storm.utils.Utils;
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.PathUtils;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -26,21 +36,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.utils.LocalState;
-import backtype.storm.utils.Utils;
-
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.jstorm.utils.PathUtils;
-
 public class StormConfig {
-    private final static Logger LOG = LoggerFactory
-            .getLogger(StormConfig.class);
+    private final static Logger LOG = LoggerFactory.getLogger(StormConfig.class);
     public final static String RESOURCES_SUBDIR = "resources";
     public final static String WORKER_DATA_SUBDIR = "worker_shared_data";
 
@@ -80,11 +77,10 @@ public class StormConfig {
         return rtn;
     }
 
-    public static HashMap<String, Object> getClassFields(Class<?> cls)
-            throws IllegalArgumentException, IllegalAccessException {
-        java.lang.reflect.Field[] list = cls.getDeclaredFields();
+    public static HashMap<String, Object> getClassFields(Class<?> cls) throws IllegalArgumentException, IllegalAccessException {
+        Field[] list = cls.getDeclaredFields();
         HashMap<String, Object> rtn = new HashMap<String, Object>();
-        for (java.lang.reflect.Field f : list) {
+        for (Field f : list) {
             String name = f.getName();
             rtn.put(name, f.get(null).toString());
 
@@ -98,19 +94,26 @@ public class StormConfig {
 
     }
 
+    /**
+     * please use ConfigExtension.getClusterName(Map conf)
+     */
+    @Deprecated
+    public static String cluster_name(Map conf) {
+        return ConfigExtension.getClusterName(conf);
+    }
+
     public static boolean local_mode(Map conf) {
         String mode = (String) conf.get(Config.STORM_CLUSTER_MODE);
         if (mode != null) {
-            if (mode.equals("local")) {
+            if ("local".equals(mode)) {
                 return true;
             }
 
-            if (mode.equals("distributed")) {
+            if ("distributed".equals(mode)) {
                 return false;
             }
         }
-        throw new IllegalArgumentException("Illegal cluster mode in conf:"
-                + mode);
+        throw new IllegalArgumentException("Illegal cluster mode in conf:" + mode);
 
     }
 
@@ -121,24 +124,20 @@ public class StormConfig {
      */
     public static void validate_distributed_mode(Map<?, ?> conf) {
         if (StormConfig.local_mode(conf)) {
-            throw new IllegalArgumentException(
-                    "Cannot start server in local mode!");
+            throw new IllegalArgumentException("Cannot start server in local mode!");
         }
 
     }
 
     public static void validate_local_mode(Map<?, ?> conf) {
         if (!StormConfig.local_mode(conf)) {
-            throw new IllegalArgumentException(
-                    "Cannot start server in distributed mode!");
+            throw new IllegalArgumentException("Cannot start server in distributed mode!");
         }
 
     }
 
     public static String worker_root(Map conf) throws IOException {
-        String ret =
-                String.valueOf(conf.get(Config.STORM_LOCAL_DIR))
-                        + FILE_SEPERATEOR + "workers";
+        String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPERATEOR + "workers";
         FileUtils.forceMkdir(new File(ret));
         return ret;
     }
@@ -149,39 +148,38 @@ public class StormConfig {
         return ret;
     }
 
-    public static String worker_pids_root(Map conf, String id)
-            throws IOException {
+    public static String worker_pids_root(Map conf, String id) throws IOException {
         String ret = worker_root(conf, id) + FILE_SEPERATEOR + "pids";
         FileUtils.forceMkdir(new File(ret));
         return ret;
     }
 
-    public static String worker_pid_path(Map conf, String id, String pid)
-            throws IOException {
+    public static String worker_pid_path(Map conf, String id, String pid) throws IOException {
         String ret = worker_pids_root(conf, id) + FILE_SEPERATEOR + pid;
         return ret;
     }
 
-    public static String worker_heartbeats_root(Map conf, String id)
-            throws IOException {
+    public static String worker_heartbeats_root(Map conf, String id) throws IOException {
         String ret = worker_root(conf, id) + FILE_SEPERATEOR + "heartbeats";
         FileUtils.forceMkdir(new File(ret));
         return ret;
     }
 
     public static String default_worker_shared_dir(Map conf) throws IOException {
-        String ret =
-                String.valueOf(conf.get(Config.STORM_LOCAL_DIR))
-                        + FILE_SEPERATEOR + WORKER_DATA_SUBDIR;
+        String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPERATEOR + WORKER_DATA_SUBDIR;
 
         FileUtils.forceMkdir(new File(ret));
         return ret;
     }
 
+    private static String drpc_local_dir(Map conf) throws IOException {
+        String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPERATEOR + "drpc";
+        FileUtils.forceMkdir(new File(ret));
+        return ret;
+    }
+
     private static String supervisor_local_dir(Map conf) throws IOException {
-        String ret =
-                String.valueOf(conf.get(Config.STORM_LOCAL_DIR))
-                        + FILE_SEPERATEOR + "supervisor";
+        String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPERATEOR + "supervisor";
         FileUtils.forceMkdir(new File(ret));
         return ret;
     }
@@ -192,8 +190,7 @@ public class StormConfig {
         return ret;
     }
 
-    public static String supervisor_stormdist_root(Map conf, String topologyId)
-            throws IOException {
+    public static String supervisor_stormdist_root(Map conf, String topologyId) throws IOException {
         return supervisor_stormdist_root(conf) + FILE_SEPERATEOR + topologyId;
     }
 
@@ -216,17 +213,32 @@ public class StormConfig {
     }
 
     /**
+     * Return drpc's pid dir
+     *
+     * @param conf
+     * @return
+     * @throws IOException
+     */
+    public static String drpcPids(Map conf) throws IOException {
+        String ret = drpc_local_dir(conf) + FILE_SEPERATEOR + "pids";
+        try {
+            FileUtils.forceMkdir(new File(ret));
+        } catch (IOException e) {
+            LOG.error("Failed to create dir " + ret, e);
+            throw e;
+        }
+        return ret;
+    }
+
+    /**
      * Return nimbus's heartbeat dir for apsara
      * 
      * @param conf
      * @return
      * @throws IOException
      */
-    public static String supervisorHearbeatForContainer(Map conf)
-            throws IOException {
-        String ret =
-                supervisor_local_dir(conf) + FILE_SEPERATEOR
-                        + "supervisor.heartbeat";
+    public static String supervisorHearbeatForContainer(Map conf) throws IOException {
+        String ret = supervisor_local_dir(conf) + FILE_SEPERATEOR + "supervisor.heartbeat";
         try {
             FileUtils.forceMkdir(new File(ret));
         } catch (IOException e) {
@@ -272,8 +284,7 @@ public class StormConfig {
         return stormroot + FILE_SEPERATEOR + "timestamp";
     }
 
-    public static LocalState worker_state(Map conf, String id)
-            throws IOException {
+    public static LocalState worker_state(Map conf, String id) throws IOException {
         String path = worker_heartbeats_root(conf, id);
 
         LocalState rtn = new LocalState(path);
@@ -282,9 +293,18 @@ public class StormConfig {
     }
 
     public static String masterLocalDir(Map conf) throws IOException {
-        String ret =
-                String.valueOf(conf.get(Config.STORM_LOCAL_DIR))
-                        + FILE_SEPERATEOR + "nimbus";
+        String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPERATEOR + "nimbus";
+        try {
+            FileUtils.forceMkdir(new File(ret));
+        } catch (IOException e) {
+            LOG.error("Failed to create dir " + ret, e);
+            throw e;
+        }
+        return ret;
+    }
+
+    public static String metricLocalDir(Map conf) throws IOException {
+        String ret = String.valueOf(conf.get(Config.STORM_LOCAL_DIR)) + FILE_SEPERATEOR + "metrics";
         try {
             FileUtils.forceMkdir(new File(ret));
         } catch (IOException e) {
@@ -300,8 +320,7 @@ public class StormConfig {
         return ret;
     }
 
-    public static String masterStormdistRoot(Map conf, String topologyId)
-            throws IOException {
+    public static String masterStormdistRoot(Map conf, String topologyId) throws IOException {
         return masterStormdistRoot(conf) + FILE_SEPERATEOR + topologyId;
     }
 
@@ -311,8 +330,7 @@ public class StormConfig {
         return ret;
     }
 
-    public static String masterStormTmpRoot(Map conf, String topologyId)
-            throws IOException {
+    public static String masterStormTmpRoot(Map conf, String topologyId) throws IOException {
         return masterStormTmpRoot(conf) + FILE_SEPERATEOR + topologyId;
     }
 
@@ -363,10 +381,8 @@ public class StormConfig {
      * @return
      * @throws IOException
      */
-    public static String masterHearbeatForContainer(Map conf)
-            throws IOException {
-        String ret =
-                masterLocalDir(conf) + FILE_SEPERATEOR + "nimbus.heartbeat";
+    public static String masterHearbeatForContainer(Map conf) throws IOException {
+        String ret = masterLocalDir(conf) + FILE_SEPERATEOR + "nimbus.heartbeat";
         try {
             FileUtils.forceMkdir(new File(ret));
         } catch (IOException e) {
@@ -375,11 +391,15 @@ public class StormConfig {
         }
         return ret;
     }
-    
+
     public static String masterDbDir(Map conf) throws IOException {
         return masterLocalDir(conf) + FILE_SEPERATEOR + "rocksdb";
     }
 
+    public static String metricDbDir(Map conf) throws IOException {
+        return metricLocalDir(conf) + FILE_SEPERATEOR + "rocksdb";
+    }
+
     public static String supervisorTmpDir(Map conf) throws IOException {
         String ret = null;
         try {
@@ -397,8 +417,7 @@ public class StormConfig {
     public static LocalState supervisorState(Map conf) throws IOException {
         LocalState localState = null;
         try {
-            String localstateDir =
-                    supervisor_local_dir(conf) + FILE_SEPERATEOR + "localstate";
+            String localstateDir = supervisor_local_dir(conf) + FILE_SEPERATEOR + "localstate";
             FileUtils.forceMkdir(new File(localstateDir));
             localState = new LocalState(localstateDir);
         } catch (IOException e) {
@@ -416,25 +435,20 @@ public class StormConfig {
      * @return
      * @throws IOException
      */
-    public static Map read_supervisor_topology_conf(Map conf, String topologyId)
-            throws IOException {
-        String topologyRoot =
-                StormConfig.supervisor_stormdist_root(conf, topologyId);
+    public static Map read_supervisor_topology_conf(Map conf, String topologyId) throws IOException {
+        String topologyRoot = StormConfig.supervisor_stormdist_root(conf, topologyId);
         String confPath = StormConfig.stormconf_path(topologyRoot);
         return (Map) readLocalObject(topologyId, confPath);
     }
 
-    public static StormTopology read_supervisor_topology_code(Map conf,
-            String topologyId) throws IOException {
-        String topologyRoot =
-                StormConfig.supervisor_stormdist_root(conf, topologyId);
+    public static StormTopology read_supervisor_topology_code(Map conf, String topologyId) throws IOException {
+        String topologyRoot = StormConfig.supervisor_stormdist_root(conf, topologyId);
         String codePath = StormConfig.stormcode_path(topologyRoot);
         return (StormTopology) readLocalObject(topologyId, codePath);
     }
 
     @SuppressWarnings("rawtypes")
-    public static List<String> get_supervisor_toplogy_list(Map conf)
-            throws IOException {
+    public static List<String> get_supervisor_toplogy_list(Map conf) throws IOException {
 
         // get the path: STORM-LOCAL-DIR/supervisor/stormdist/
         String path = StormConfig.supervisor_stormdist_root(conf);
@@ -444,48 +458,40 @@ public class StormConfig {
         return topologyids;
     }
 
-    public static Map read_nimbus_topology_conf(Map conf, String topologyId)
-            throws IOException {
+    public static Map read_nimbus_topology_conf(Map conf, String topologyId) throws IOException {
         String topologyRoot = StormConfig.masterStormdistRoot(conf, topologyId);
         return read_topology_conf(topologyRoot, topologyId);
     }
 
-    public static void write_nimbus_topology_conf(Map conf, String topologyId,
-            Map topoConf) throws IOException {
+    public static void write_nimbus_topology_conf(Map conf, String topologyId, Map topoConf) throws IOException {
         String topologyRoot = StormConfig.masterStormdistRoot(conf, topologyId);
         String confPath = StormConfig.stormconf_path(topologyRoot);
-        FileUtils.writeByteArrayToFile(new File(confPath),
-                Utils.serialize(topoConf));
+        FileUtils.writeByteArrayToFile(new File(confPath), Utils.serialize(topoConf));
     }
 
-    public static Map read_nimbusTmp_topology_conf(Map conf, String topologyId)
-            throws IOException {
+    public static Map read_nimbusTmp_topology_conf(Map conf, String topologyId) throws IOException {
         String topologyRoot = StormConfig.masterStormTmpRoot(conf, topologyId);
         return read_topology_conf(topologyRoot, topologyId);
     }
 
-    public static Map read_topology_conf(String topologyRoot, String topologyId)
-            throws IOException {
+    public static Map read_topology_conf(String topologyRoot, String topologyId) throws IOException {
         String readFile = StormConfig.stormconf_path(topologyRoot);
         return (Map) readLocalObject(topologyId, readFile);
     }
 
-    public static StormTopology read_nimbus_topology_code(Map conf,
-            String topologyId) throws IOException {
+    public static StormTopology read_nimbus_topology_code(Map conf, String topologyId) throws IOException {
         String topologyRoot = StormConfig.masterStormdistRoot(conf, topologyId);
         String codePath = StormConfig.stormcode_path(topologyRoot);
         return (StormTopology) readLocalObject(topologyId, codePath);
     }
 
-    public static void write_nimbus_topology_code(Map conf, String topologyId,
-            byte[] data) throws IOException {
+    public static void write_nimbus_topology_code(Map conf, String topologyId, byte[] data) throws IOException {
         String topologyRoot = StormConfig.masterStormdistRoot(conf, topologyId);
         String codePath = StormConfig.stormcode_path(topologyRoot);
         FileUtils.writeByteArrayToFile(new File(codePath), data);
     }
 
-    public static long read_supervisor_topology_timestamp(Map conf,
-            String topologyId) throws IOException {
+    public static long read_supervisor_topology_timestamp(Map conf, String topologyId) throws IOException {
         String stormRoot = supervisor_stormdist_root(conf, topologyId);
         String timeStampPath = stormts_path(stormRoot);
 
@@ -493,8 +499,7 @@ public class StormConfig {
         return JStormUtils.bytesToLong(data);
     }
 
-    public static void write_supervisor_topology_timestamp(Map conf,
-            String topologyId, long timeStamp) throws IOException {
+    public static void write_supervisor_topology_timestamp(Map conf, String topologyId, long timeStamp) throws IOException {
         String stormRoot = supervisor_stormdist_root(conf, topologyId);
         String timeStampPath = stormts_path(stormRoot);
 
@@ -502,6 +507,22 @@ public class StormConfig {
         FileUtils.writeByteArrayToFile(new File(timeStampPath), data);
     }
 
+    public static long read_nimbus_topology_timestamp(Map conf, String topologyId) throws IOException {
+        String stormRoot = masterStormdistRoot(conf, topologyId);
+        String timeStampPath = stormts_path(stormRoot);
+
+        byte[] data = FileUtils.readFileToByteArray(new File(timeStampPath));
+        return JStormUtils.bytesToLong(data);
+    }
+
+    public static void write_nimbus_topology_timestamp(Map conf, String topologyId, long timeStamp) throws IOException {
+        String stormRoot = masterStormdistRoot(conf, topologyId);
+        String timeStampPath = stormts_path(stormRoot);
+
+        byte[] data = JStormUtils.longToBytes(timeStamp);
+        FileUtils.writeByteArrayToFile(new File(timeStampPath), data);
+    }
+
     /**
      * stormconf has mergered into clusterconf
      * 
@@ -511,12 +532,9 @@ public class StormConfig {
      * @throws IOException
      */
     @SuppressWarnings("unchecked")
-    public static Object readLocalObject(String topologyId, String readFile)
-            throws IOException {
+    public static Object readLocalObject(String topologyId, String readFile) throws IOException {
 
-        String errMsg =
-                "Failed to get topology configuration of " + topologyId
-                        + " file:" + readFile;
+        String errMsg = "Failed to get topology configuration of " + topologyId + " file:" + readFile;
 
         byte[] bconf = FileUtils.readFileToByteArray(new File(readFile));
         if (bconf == null) {
@@ -537,10 +555,8 @@ public class StormConfig {
         return ret;
     }
 
-    public static long get_supervisor_topology_Bianrymodify_time(Map conf,
-            String topologyId) throws IOException {
-        String topologyRoot =
-                StormConfig.supervisor_stormdist_root(conf, topologyId);
+    public static long get_supervisor_topology_Bianrymodify_time(Map conf, String topologyId) throws IOException {
+        String topologyRoot = StormConfig.supervisor_stormdist_root(conf, topologyId);
         File f = new File(topologyRoot);
         long modifyTime = f.lastModified();
         return modifyTime;

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormMonitor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormMonitor.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormMonitor.java
index 935a638..c92b362 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormMonitor.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormMonitor.java
@@ -44,7 +44,6 @@ public class StormMonitor implements Serializable {
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
 }
\ No newline at end of file