You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:55 UTC

[17/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusData.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusData.java
index 02b574f..3871074 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusData.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusData.java
@@ -17,36 +17,37 @@
  */
 package com.alibaba.jstorm.daemon.nimbus;
 
-import java.io.IOException;
-import java.nio.channels.Channel;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import backtype.storm.Config;
+import backtype.storm.generated.TopologyTaskHbInfo;
+import backtype.storm.scheduler.INimbus;
+import backtype.storm.utils.BufferFileInputStream;
+import backtype.storm.utils.TimeCacheMap;
 import com.alibaba.jstorm.cache.JStormCache;
+import com.alibaba.jstorm.callback.AsyncLoopThread;
+import com.alibaba.jstorm.client.ConfigExtension;
 import com.alibaba.jstorm.cluster.Cluster;
 import com.alibaba.jstorm.cluster.StormClusterState;
 import com.alibaba.jstorm.cluster.StormConfig;
 import com.alibaba.jstorm.cluster.StormZkClusterState;
+import com.alibaba.jstorm.metric.JStormMetricCache;
+import com.alibaba.jstorm.metric.JStormMetricsReporter;
 import com.alibaba.jstorm.task.TkHbCacheTime;
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import backtype.storm.Config;
-import backtype.storm.scheduler.INimbus;
-import backtype.storm.utils.BufferFileInputStream;
-import backtype.storm.utils.TimeCacheMap;
+import java.io.IOException;
+import java.nio.channels.Channel;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * All nimbus data
- * 
  */
 public class NimbusData {
     private static final Logger LOG = LoggerFactory.getLogger(NimbusData.class);
@@ -62,7 +63,7 @@ public class NimbusData {
     private TimeCacheMap<Object, Object> downloaders;
     private TimeCacheMap<Object, Object> uploaders;
     // cache thrift response to avoid scan zk too frequently
-    private NimbusCache cache;
+    private NimbusCache nimbusCache;
 
     private int startTime;
 
@@ -82,17 +83,24 @@ public class NimbusData {
 
     private AtomicBoolean isShutdown = new AtomicBoolean(false);
 
-    private final TopologyMetricsRunnable metricRunnable;
+    private TopologyMetricsRunnable metricRunnable;
+    private AsyncLoopThread metricLoopThread;
 
     // The topologys which has been submitted, but the assignment is not
     // finished
     private TimeCacheMap<String, Object> pendingSubmitTopologys;
-
     private Map<String, Integer> topologyTaskTimeout;
-    
-    private TopologyNettyMgr topologyNettyMgr ;
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
+    // Map<TopologyId, TasksHeartbeat>
+    private Map<String, TopologyTaskHbInfo> tasksHeartbeat;
+
+    private final JStormMetricCache metricCache;
+
+    private final String clusterName;
+
+    private JStormMetricsReporter metricsReporter;
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
     public NimbusData(Map conf, INimbus inimbus) throws Exception {
         this.conf = conf;
 
@@ -104,8 +112,7 @@ public class NimbusData {
 
         createCache();
 
-        this.taskHeartbeatsCache =
-                new ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>>();
+        this.taskHeartbeatsCache = new ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>>();
 
         this.scheduExec = Executors.newScheduledThreadPool(SCHEDULE_THREAD_NUM);
 
@@ -117,66 +124,63 @@ public class NimbusData {
 
         localMode = StormConfig.local_mode(conf);
 
-        this.topologyNettyMgr = new TopologyNettyMgr(conf);
+        this.metricCache = new JStormMetricCache(conf, this.stormClusterState);
+        this.clusterName = ConfigExtension.getClusterName(conf);
+
         this.metricRunnable = new TopologyMetricsRunnable(this);
+        this.metricRunnable.init();
 
-        pendingSubmitTopologys =
-                new TimeCacheMap<String, Object>(JStormUtils.MIN_30);
-        
+        pendingSubmitTopologys = new TimeCacheMap<String, Object>(JStormUtils.MIN_30);
         topologyTaskTimeout = new ConcurrentHashMap<String, Integer>();
+        tasksHeartbeat = new ConcurrentHashMap<String, TopologyTaskHbInfo>();
+
+        if (!localMode) {
+            startMetricThreads();
+        }
     }
 
-    /**
-     * Just for test
-     */
-    public NimbusData() {
-        scheduExec = Executors.newScheduledThreadPool(6);
+    public void startMetricThreads() {
+        this.metricRunnable.start();
 
-        inimubs = null;
-        conf = new HashMap<Object, Object>();
-        localMode = false;
-        this.metricRunnable = new TopologyMetricsRunnable(this);
+        // init nimbus metric reporter
+        this.metricsReporter = new JStormMetricsReporter(this);
+        this.metricsReporter.init();
     }
 
     public void createFileHandler() {
-        TimeCacheMap.ExpiredCallback<Object, Object> expiredCallback =
-                new TimeCacheMap.ExpiredCallback<Object, Object>() {
-                    @Override
-                    public void expire(Object key, Object val) {
-                        try {
-                            LOG.info("Close file " + String.valueOf(key));
-                            if (val != null) {
-                                if (val instanceof Channel) {
-                                    Channel channel = (Channel) val;
-                                    channel.close();
-                                } else if (val instanceof BufferFileInputStream) {
-                                    BufferFileInputStream is =
-                                            (BufferFileInputStream) val;
-                                    is.close();
-                                }
-                            }
-                        } catch (IOException e) {
-                            LOG.error(e.getMessage(), e);
+        TimeCacheMap.ExpiredCallback<Object, Object> expiredCallback = new TimeCacheMap.ExpiredCallback<Object, Object>() {
+            @Override
+            public void expire(Object key, Object val) {
+                try {
+                    LOG.info("Close file " + String.valueOf(key));
+                    if (val != null) {
+                        if (val instanceof Channel) {
+                            Channel channel = (Channel) val;
+                            channel.close();
+                        } else if (val instanceof BufferFileInputStream) {
+                            BufferFileInputStream is = (BufferFileInputStream) val;
+                            is.close();
                         }
-
                     }
-                };
+                } catch (IOException e) {
+                    LOG.error(e.getMessage(), e);
+                }
+
+            }
+        };
 
-        int file_copy_expiration_secs =
-                JStormUtils.parseInt(
-                        conf.get(Config.NIMBUS_FILE_COPY_EXPIRATION_SECS), 30);
-        uploaders =
-                new TimeCacheMap<Object, Object>(file_copy_expiration_secs,
-                        expiredCallback);
-        downloaders =
-                new TimeCacheMap<Object, Object>(file_copy_expiration_secs,
-                        expiredCallback);
+        int file_copy_expiration_secs = JStormUtils.parseInt(conf.get(Config.NIMBUS_FILE_COPY_EXPIRATION_SECS), 30);
+        uploaders = new TimeCacheMap<Object, Object>(file_copy_expiration_secs, expiredCallback);
+        downloaders = new TimeCacheMap<Object, Object>(file_copy_expiration_secs, expiredCallback);
     }
 
     public void createCache() throws IOException {
-        cache = new NimbusCache(conf, stormClusterState);
-        
-        ((StormZkClusterState) stormClusterState).setCache(cache.getMemCache());
+        nimbusCache = new NimbusCache(conf, stormClusterState);
+        ((StormZkClusterState) stormClusterState).setCache(nimbusCache.getMemCache());
+    }
+
+    public String getClusterName() {
+        return clusterName;
     }
 
     public int uptime() {
@@ -203,8 +207,7 @@ public class NimbusData {
         return taskHeartbeatsCache;
     }
 
-    public Map<Integer, TkHbCacheTime> getTaskHeartbeatsCache(
-            String topologyId, boolean createIfNotExist) {
+    public Map<Integer, TkHbCacheTime> getTaskHeartbeatsCache(String topologyId, boolean createIfNotExist) {
         Map<Integer, TkHbCacheTime> ret = null;
         ret = taskHeartbeatsCache.get(topologyId);
         if (ret == null && createIfNotExist) {
@@ -214,8 +217,7 @@ public class NimbusData {
         return ret;
     }
 
-    public void setTaskHeartbeatsCache(
-            ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>> taskHeartbeatsCache) {
+    public void setTaskHeartbeatsCache(ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>> taskHeartbeatsCache) {
         this.taskHeartbeatsCache = taskHeartbeatsCache;
     }
 
@@ -256,7 +258,7 @@ public class NimbusData {
     }
 
     public void cleanup() {
-        cache.cleanup();
+        nimbusCache.cleanup();
         LOG.info("Successfully shutdown Cache");
         try {
             stormClusterState.disconnect();
@@ -296,15 +298,19 @@ public class NimbusData {
     }
 
     public JStormCache getMemCache() {
-        return cache.getMemCache();
+        return nimbusCache.getMemCache();
     }
-    
+
     public JStormCache getDbCache() {
-        return cache.getDbCache();
+        return nimbusCache.getDbCache();
     }
-    
+
     public NimbusCache getNimbusCache() {
-        return cache;
+        return nimbusCache;
+    }
+
+    public JStormMetricCache getMetricCache() {
+        return metricCache;
     }
 
     public final TopologyMetricsRunnable getMetricRunnable() {
@@ -319,9 +325,7 @@ public class NimbusData {
         return topologyTaskTimeout;
     }
 
-	public TopologyNettyMgr getTopologyNettyMgr() {
-		return topologyNettyMgr;
-	}
-    
-    
+    public Map<String, TopologyTaskHbInfo> getTasksHeartbeat() {
+        return tasksHeartbeat;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusServer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusServer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusServer.java
index b22088e..5d5e18c 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusServer.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusServer.java
@@ -17,59 +17,51 @@
  */
 package com.alibaba.jstorm.daemon.nimbus;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.server.THsHaServer;
-import org.apache.thrift.transport.TNonblockingServerSocket;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.Config;
 import backtype.storm.generated.Nimbus;
 import backtype.storm.generated.Nimbus.Iface;
 import backtype.storm.scheduler.INimbus;
 import backtype.storm.utils.Utils;
-
 import com.alibaba.jstorm.callback.AsyncLoopRunnable;
 import com.alibaba.jstorm.callback.AsyncLoopThread;
 import com.alibaba.jstorm.client.ConfigExtension;
 import com.alibaba.jstorm.cluster.StormConfig;
 import com.alibaba.jstorm.daemon.supervisor.Httpserver;
 import com.alibaba.jstorm.daemon.worker.hearbeat.SyncContainerHb;
-import com.alibaba.jstorm.metric.SimpleJStormMetric;
 import com.alibaba.jstorm.schedule.CleanRunnable;
 import com.alibaba.jstorm.schedule.FollowerRunnable;
 import com.alibaba.jstorm.schedule.MonitorRunnable;
 import com.alibaba.jstorm.utils.JStormServerUtils;
 import com.alibaba.jstorm.utils.JStormUtils;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * 
- * NimbusServer work flow: 1. cleanup interrupted topology delete
- * /storm-local-dir/nimbus/topologyid/stormdis delete
- * /storm-zk-root/storms/topologyid
+ * NimbusServer work flow: 1. cleanup interrupted topology delete /storm-local-dir/nimbus/topologyid/stormdis delete /storm-zk-root/storms/topologyid
  * 
  * 2. set /storm-zk-root/storms/topology stats as run
  * 
- * 3. start one thread, every nimbus.monitor.reeq.secs set
- * /storm-zk-root/storms/ all topology as monitor. when the topology's status is
- * monitor, nimubs would reassign workers 4. start one threa, every
- * nimubs.cleanup.inbox.freq.secs cleanup useless jar
+ * 3. start one thread, every nimbus.monitor.reeq.secs set /storm-zk-root/storms/ all topology as monitor. when the topology's status is monitor, nimubs would
+ * reassign workers 4. start one threa, every nimubs.cleanup.inbox.freq.secs cleanup useless jar
  * 
  * @author version 1: Nathan Marz version 2: Lixin/Chenjun version 3: Longda
  * 
  */
 public class NimbusServer {
 
-    private static final Logger LOG = LoggerFactory
-            .getLogger(NimbusServer.class);
+    private static final Logger LOG = LoggerFactory.getLogger(NimbusServer.class);
 
     private NimbusData data;
 
@@ -83,8 +75,7 @@ public class NimbusServer {
 
     private Httpserver hs;
 
-    private List<AsyncLoopThread> smartThreads =
-            new ArrayList<AsyncLoopThread>();
+    private List<AsyncLoopThread> smartThreads = new ArrayList<AsyncLoopThread>();
 
     public static void main(String[] args) throws Exception {
         // read configuration files
@@ -134,8 +125,6 @@ public class NimbusServer {
             while (!data.isLeader())
                 Utils.sleep(5000);
 
-            initUploadMetricThread(data);
-
             init(conf);
         } catch (Throwable e) {
             LOG.error("Fail to run nimbus ", e);
@@ -146,8 +135,7 @@ public class NimbusServer {
         LOG.info("Quit nimbus");
     }
 
-    public ServiceHandler launcherLocalServer(final Map conf, INimbus inimbus)
-            throws Exception {
+    public ServiceHandler launcherLocalServer(final Map conf, INimbus inimbus) throws Exception {
         LOG.info("Begin to start nimbus on local model");
 
         StormConfig.validate_local_mode(conf);
@@ -156,9 +144,6 @@ public class NimbusServer {
 
         data = createNimbusData(conf, inimbus);
 
-        // @@@ testing
-        initUploadMetricThread(data);
-
         init(conf);
 
         return serviceHandler;
@@ -184,6 +169,8 @@ public class NimbusServer {
         serviceHandler = new ServiceHandler(data);
 
         if (!data.isLocalMode()) {
+        	
+        	//data.startMetricThreads();
 
             initMonitor(conf);
 
@@ -193,15 +180,11 @@ public class NimbusServer {
     }
 
     @SuppressWarnings("rawtypes")
-    private NimbusData createNimbusData(Map conf, INimbus inimbus)
-            throws Exception {
+    private NimbusData createNimbusData(Map conf, INimbus inimbus) throws Exception {
 
         // Callback callback=new TimerCallBack();
         // StormTimer timer=Timer.mkTimerTimer(callback);
-        NimbusData data = new NimbusData(conf, inimbus);
-
-        return data;
-
+        return new NimbusData(conf, inimbus);
     }
 
     private void initTopologyAssign() {
@@ -218,9 +201,9 @@ public class NimbusServer {
             for (String topologyid : active_ids) {
                 // set the topology status as startup
                 // in fact, startup won't change anything
-                NimbusUtils.transition(data, topologyid, false,
-                        StatusType.startup);
+                NimbusUtils.transition(data, topologyid, false, StatusType.startup);
                 NimbusUtils.updateTopologyTaskTimeout(data, topologyid);
+                NimbusUtils.updateTopologyTaskHb(data, topologyid);
             }
 
         }
@@ -235,20 +218,15 @@ public class NimbusServer {
         // Schedule Nimbus monitor
         MonitorRunnable r1 = new MonitorRunnable(data);
 
-        int monitor_freq_secs =
-                JStormUtils.parseInt(conf.get(Config.NIMBUS_MONITOR_FREQ_SECS),
-                        10);
-        scheduExec.scheduleAtFixedRate(r1, 0, monitor_freq_secs,
-                TimeUnit.SECONDS);
+        int monitor_freq_secs = JStormUtils.parseInt(conf.get(Config.NIMBUS_MONITOR_FREQ_SECS), 10);
+        scheduExec.scheduleAtFixedRate(r1, 0, monitor_freq_secs, TimeUnit.SECONDS);
 
         LOG.info("Successfully init Monitor thread");
     }
 
     /**
-     * Right now, every 600 seconds, nimbus will clean jar under
-     * /LOCAL-DIR/nimbus/inbox, which is the uploading topology directory
+     * Right now, every 600 seconds, nimbus will clean jar under /LOCAL-DIR/nimbus/inbox, which is the uploading topology directory
      * 
-     * @param conf
      * @throws IOException
      */
     @SuppressWarnings("rawtypes")
@@ -257,39 +235,25 @@ public class NimbusServer {
 
         // Schedule Nimbus inbox cleaner/nimbus/inbox jar
         String dir_location = StormConfig.masterInbox(conf);
-        int inbox_jar_expiration_secs =
-                JStormUtils
-                        .parseInt(conf
-                                .get(Config.NIMBUS_INBOX_JAR_EXPIRATION_SECS),
-                                3600);
-        CleanRunnable r2 =
-                new CleanRunnable(dir_location, inbox_jar_expiration_secs);
+        int inbox_jar_expiration_secs = JStormUtils.parseInt(conf.get(Config.NIMBUS_INBOX_JAR_EXPIRATION_SECS), 3600);
+        CleanRunnable r2 = new CleanRunnable(dir_location, inbox_jar_expiration_secs);
 
-        int cleanup_inbox_freq_secs =
-                JStormUtils.parseInt(
-                        conf.get(Config.NIMBUS_CLEANUP_INBOX_FREQ_SECS), 600);
-
-        scheduExec.scheduleAtFixedRate(r2, 0, cleanup_inbox_freq_secs,
-                TimeUnit.SECONDS);
+        int cleanup_inbox_freq_secs = JStormUtils.parseInt(conf.get(Config.NIMBUS_CLEANUP_INBOX_FREQ_SECS), 600);
 
+        scheduExec.scheduleAtFixedRate(r2, 0, cleanup_inbox_freq_secs, TimeUnit.SECONDS);
         LOG.info("Successfully init " + dir_location + " cleaner");
     }
 
     @SuppressWarnings("rawtypes")
     private void initThrift(Map conf) throws TTransportException {
-        Integer thrift_port =
-                JStormUtils.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT));
-        TNonblockingServerSocket socket =
-                new TNonblockingServerSocket(thrift_port);
+        Integer thrift_port = JStormUtils.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT));
+        TNonblockingServerSocket socket = new TNonblockingServerSocket(thrift_port);
 
-        Integer maxReadBufSize =
-                JStormUtils.parseInt(conf
-                        .get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE));
+        Integer maxReadBufSize = JStormUtils.parseInt(conf.get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE));
 
         THsHaServer.Args args = new THsHaServer.Args(socket);
         args.workerThreads(ServiceHandler.THREAD_NUM);
-        args.protocolFactory(new TBinaryProtocol.Factory(false, true,
-                maxReadBufSize, -1));
+        args.protocolFactory(new TBinaryProtocol.Factory(false, true, maxReadBufSize, -1));
 
         args.processor(new Nimbus.Processor<Iface>(serviceHandler));
         args.maxReadBufferBytes = maxReadBufSize;
@@ -317,53 +281,15 @@ public class NimbusServer {
         });
     }
 
-    private void initUploadMetricThread(NimbusData data) {
-        final TopologyMetricsRunnable metricRunnable = data.getMetricRunnable();
-        
-        int threadNum = ConfigExtension.getNimbusMetricThreadNum(data.getConf());
-        
-        for (int i = 0; i < threadNum; i++) {
-            AsyncLoopThread thread = new AsyncLoopThread(metricRunnable);
-            smartThreads.add(thread);
-        }
-        
-        Runnable pusher = new Runnable() {
-
-            @Override
-            public void run() {
-                // TODO Auto-generated method stub
-                TopologyMetricsRunnable.Upload event =
-                        new TopologyMetricsRunnable.Upload();
-                event.timeStamp = System.currentTimeMillis();
-
-                metricRunnable.pushEvent(event);
-            }
-
-        };
-
-        ScheduledExecutorService scheduleService = data.getScheduExec();
-        scheduleService.scheduleAtFixedRate(pusher, 120, 60,
-                TimeUnit.SECONDS);
-        
-        SimpleJStormMetric nimbusMetric = SimpleJStormMetric.mkInstance();
-        scheduleService.scheduleAtFixedRate(nimbusMetric, 120, 60,
-                        TimeUnit.SECONDS);
-        
-        //AsyncLoopThread nimbusCacheThread = new AsyncLoopThread(data.getNimbusCache().getCacheRunnable());
-        //smartThreads.add(nimbusCacheThread);
-
-        LOG.info("Successfully init metrics uploading thread");
-    }
-
     public void cleanup() {
-        if (data.getIsShutdown().getAndSet(true) == true) {
+        if (data.getIsShutdown().getAndSet(true)) {
             LOG.info("Notify to quit nimbus");
             return;
         }
 
         LOG.info("Begin to shutdown nimbus");
         AsyncLoopRunnable.getShutdown().set(true);
-        
+
         data.getScheduExec().shutdownNow();
 
         for (AsyncLoopThread t : smartThreads) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusUtils.java
index 7181e77..4e032e3 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusUtils.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/NimbusUtils.java
@@ -17,53 +17,31 @@
  */
 package com.alibaba.jstorm.daemon.nimbus;
 
-import java.io.BufferedReader;
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.security.InvalidParameterException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.Config;
-import backtype.storm.generated.Bolt;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.NimbusStat;
-import backtype.storm.generated.NimbusSummary;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.generated.SpoutSpec;
-import backtype.storm.generated.StateSpoutSpec;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SupervisorSummary;
-import backtype.storm.generated.TopologySummary;
+import backtype.storm.generated.*;
 import backtype.storm.utils.ThriftTopologyUtils;
 import backtype.storm.utils.Utils;
-
 import com.alibaba.jstorm.client.ConfigExtension;
 import com.alibaba.jstorm.cluster.Cluster;
+import com.alibaba.jstorm.cluster.Common;
 import com.alibaba.jstorm.cluster.StormBase;
 import com.alibaba.jstorm.cluster.StormClusterState;
 import com.alibaba.jstorm.cluster.StormConfig;
 import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
 import com.alibaba.jstorm.schedule.Assignment;
 import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
+import com.alibaba.jstorm.task.TaskInfo;
 import com.alibaba.jstorm.task.TkHbCacheTime;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeat;
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.PathUtils;
 import com.alibaba.jstorm.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.InvalidParameterException;
+import java.util.*;
+import java.util.Map.Entry;
 
 public class NimbusUtils {
 
@@ -71,7 +49,6 @@ public class NimbusUtils {
 
     /**
      * add coustom KRYO serialization
-     * 
      */
     private static Map mapifySerializations(List sers) {
         Map rtn = new HashMap();
@@ -92,8 +69,6 @@ public class NimbusUtils {
     /**
      * Normalize stormConf
      * 
-     * 
-     * 
      * @param conf
      * @param stormConf
      * @param topology
@@ -101,8 +76,7 @@ public class NimbusUtils {
      * @throws Exception
      */
     @SuppressWarnings("rawtypes")
-    public static Map normalizeConf(Map conf, Map stormConf,
-            StormTopology topology) throws Exception {
+    public static Map normalizeConf(Map conf, Map stormConf, StormTopology topology) throws Exception {
 
         List kryoRegisterList = new ArrayList();
         List kryoDecoratorList = new ArrayList();
@@ -113,18 +87,14 @@ public class NimbusUtils {
 
         Object totalRegister = totalConf.get(Config.TOPOLOGY_KRYO_REGISTER);
         if (totalRegister != null) {
-            LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME)
-                    + ", TOPOLOGY_KRYO_REGISTER"
-                    + totalRegister.getClass().getName());
+            LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME) + ", TOPOLOGY_KRYO_REGISTER" + totalRegister.getClass().getName());
 
             JStormUtils.mergeList(kryoRegisterList, totalRegister);
         }
 
         Object totalDecorator = totalConf.get(Config.TOPOLOGY_KRYO_DECORATORS);
         if (totalDecorator != null) {
-            LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME)
-                    + ", TOPOLOGY_KRYO_DECORATOR"
-                    + totalDecorator.getClass().getName());
+            LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME) + ", TOPOLOGY_KRYO_DECORATOR" + totalDecorator.getClass().getName());
             JStormUtils.mergeList(kryoDecoratorList, totalDecorator);
         }
 
@@ -132,9 +102,7 @@ public class NimbusUtils {
         for (Iterator it = cids.iterator(); it.hasNext();) {
             String componentId = (String) it.next();
 
-            ComponentCommon common =
-                    ThriftTopologyUtils.getComponentCommon(topology,
-                            componentId);
+            ComponentCommon common = ThriftTopologyUtils.getComponentCommon(topology, componentId);
             String json = common.get_json_conf();
             if (json == null) {
                 continue;
@@ -150,24 +118,18 @@ public class NimbusUtils {
                 throw new Exception(sb.toString());
             }
 
-            Object componentKryoRegister =
-                    mtmp.get(Config.TOPOLOGY_KRYO_REGISTER);
+            Object componentKryoRegister = mtmp.get(Config.TOPOLOGY_KRYO_REGISTER);
 
             if (componentKryoRegister != null) {
-                LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME)
-                        + ", componentId:" + componentId
-                        + ", TOPOLOGY_KRYO_REGISTER"
+                LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME) + ", componentId:" + componentId + ", TOPOLOGY_KRYO_REGISTER"
                         + componentKryoRegister.getClass().getName());
 
                 JStormUtils.mergeList(kryoRegisterList, componentKryoRegister);
             }
 
-            Object componentDecorator =
-                    mtmp.get(Config.TOPOLOGY_KRYO_DECORATORS);
+            Object componentDecorator = mtmp.get(Config.TOPOLOGY_KRYO_DECORATORS);
             if (componentDecorator != null) {
-                LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME)
-                        + ", componentId:" + componentId
-                        + ", TOPOLOGY_KRYO_DECORATOR"
+                LOG.info("topology:" + stormConf.get(Config.TOPOLOGY_NAME) + ", componentId:" + componentId + ", TOPOLOGY_KRYO_DECORATOR"
                         + componentDecorator.getClass().getName());
                 JStormUtils.mergeList(kryoDecoratorList, componentDecorator);
             }
@@ -177,25 +139,23 @@ public class NimbusUtils {
         Map kryoRegisterMap = mapifySerializations(kryoRegisterList);
         List decoratorList = JStormUtils.distinctList(kryoDecoratorList);
 
-        Integer ackerNum =
-                JStormUtils.parseInt(totalConf
-                        .get(Config.TOPOLOGY_ACKER_EXECUTORS));
+        Integer ackerNum = JStormUtils.parseInt(totalConf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
         if (ackerNum == null) {
             ackerNum = Integer.valueOf(1);
         }
 
         Map rtn = new HashMap();
+        //ensure to be cluster_mode
+        rtn.put(Config.STORM_CLUSTER_MODE, conf.get(Config.STORM_CLUSTER_MODE));
         rtn.putAll(stormConf);
-        rtn.put(Config.TOPOLOGY_KRYO_DECORATORS, decoratorList);
+		rtn.put(Config.TOPOLOGY_KRYO_DECORATORS, decoratorList);
         rtn.put(Config.TOPOLOGY_KRYO_REGISTER, kryoRegisterMap);
         rtn.put(Config.TOPOLOGY_ACKER_EXECUTORS, ackerNum);
-        rtn.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM,
-                totalConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
+        rtn.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, totalConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
         return rtn;
     }
 
-    public static Integer componentParalism(Map stormConf,
-            ComponentCommon common) {
+    public static Integer componentParalism(Map stormConf, ComponentCommon common) {
         Map mergeMap = new HashMap();
         mergeMap.putAll(stormConf);
 
@@ -223,8 +183,7 @@ public class NimbusUtils {
         // }
         // }
 
-        Object maxTaskParalismObject =
-                mergeMap.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM);
+        Object maxTaskParalismObject = mergeMap.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM);
         if (maxTaskParalismObject == null) {
             return taskNum;
         } else {
@@ -239,24 +198,19 @@ public class NimbusUtils {
      * finalize component's task paralism
      * 
      * @param topology
-     * @param fromConf means if the paralism is read from conf file instead of
-     *            reading from topology code
+     * @param fromConf means if the paralism is read from conf file instead of reading from topology code
      * @return
      */
-    public static StormTopology normalizeTopology(Map stormConf,
-            StormTopology topology, boolean fromConf) {
+    public static StormTopology normalizeTopology(Map stormConf, StormTopology topology, boolean fromConf) {
         StormTopology ret = topology.deepCopy();
 
-        Map<String, Object> rawComponents =
-                ThriftTopologyUtils.getComponents(topology);
+        Map<String, Object> rawComponents = ThriftTopologyUtils.getComponents(topology);
 
         Map<String, Object> components = ThriftTopologyUtils.getComponents(ret);
 
         if (rawComponents.keySet().equals(components.keySet()) == false) {
-            String errMsg =
-                    "Failed to normalize topology binary, maybe due to wrong dependency";
-            LOG.info(errMsg + " raw components:" + rawComponents.keySet()
-                    + ", normalized " + components.keySet());
+            String errMsg = "Failed to normalize topology binary, maybe due to wrong dependency";
+            LOG.info(errMsg + " raw components:" + rawComponents.keySet() + ", normalized " + components.keySet());
 
             throw new InvalidParameterException(errMsg);
         }
@@ -269,9 +223,7 @@ public class NimbusUtils {
             if (component instanceof Bolt) {
                 common = ((Bolt) component).get_common();
                 if (fromConf) {
-                    Integer paraNum =
-                            ConfigExtension.getBoltParallelism(stormConf,
-                                    componentName);
+                    Integer paraNum = ConfigExtension.getBoltParallelism(stormConf, componentName);
                     if (paraNum != null) {
                         LOG.info("Set " + componentName + " as " + paraNum);
                         common.set_parallelism_hint(paraNum);
@@ -281,9 +233,7 @@ public class NimbusUtils {
             if (component instanceof SpoutSpec) {
                 common = ((SpoutSpec) component).get_common();
                 if (fromConf) {
-                    Integer paraNum =
-                            ConfigExtension.getSpoutParallelism(stormConf,
-                                    componentName);
+                    Integer paraNum = ConfigExtension.getSpoutParallelism(stormConf, componentName);
                     if (paraNum != null) {
                         LOG.info("Set " + componentName + " as " + paraNum);
                         common.set_parallelism_hint(paraNum);
@@ -293,9 +243,7 @@ public class NimbusUtils {
             if (component instanceof StateSpoutSpec) {
                 common = ((StateSpoutSpec) component).get_common();
                 if (fromConf) {
-                    Integer paraNum =
-                            ConfigExtension.getSpoutParallelism(stormConf,
-                                    componentName);
+                    Integer paraNum = ConfigExtension.getSpoutParallelism(stormConf, componentName);
                     if (paraNum != null) {
                         LOG.info("Set " + componentName + " as " + paraNum);
                         common.set_parallelism_hint(paraNum);
@@ -307,8 +255,7 @@ public class NimbusUtils {
 
             String jsonConfString = common.get_json_conf();
             if (jsonConfString != null) {
-                componentMap
-                        .putAll((Map) JStormUtils.from_json(jsonConfString));
+                componentMap.putAll((Map) JStormUtils.from_json(jsonConfString));
             }
 
             Integer taskNum = componentParalism(stormConf, common);
@@ -328,20 +275,16 @@ public class NimbusUtils {
      * clean the topology which is in ZK but not in local dir
      * 
      * @throws Exception
-     * 
      */
-    public static void cleanupCorruptTopologies(NimbusData data)
-            throws Exception {
+    public static void cleanupCorruptTopologies(NimbusData data) throws Exception {
 
         StormClusterState stormClusterState = data.getStormClusterState();
 
         // get /local-storm-dir/nimbus/stormdist path
-        String master_stormdist_root =
-                StormConfig.masterStormdistRoot(data.getConf());
+        String master_stormdist_root = StormConfig.masterStormdistRoot(data.getConf());
 
         // listdir /local-storm-dir/nimbus/stormdist
-        List<String> code_ids =
-                PathUtils.read_dir_contents(master_stormdist_root);
+        List<String> code_ids = PathUtils.read_dir_contents(master_stormdist_root);
 
         // get topology in ZK /storms
         List<String> active_ids = data.getStormClusterState().active_storms();
@@ -352,9 +295,7 @@ public class NimbusUtils {
             }
 
             for (String corrupt : active_ids) {
-                LOG.info("Corrupt topology "
-                        + corrupt
-                        + " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...");
+                LOG.info("Corrupt topology " + corrupt + " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...");
 
                 /**
                  * Just removing the /STORMS is enough
@@ -368,53 +309,47 @@ public class NimbusUtils {
 
     }
 
-    public static boolean isTaskDead(NimbusData data, String topologyId,
-            Integer taskId) {
+    public static boolean isTaskDead(NimbusData data, String topologyId, Integer taskId) {
         String idStr = " topology:" + topologyId + ",taskid:" + taskId;
 
-        Integer zkReportTime = null;
+        TopologyTaskHbInfo topoTasksHbInfo = data.getTasksHeartbeat().get(topologyId);
+        Map<Integer, TaskHeartbeat> taskHbMap = null;
+        Integer taskReportTime = null;
 
-        StormClusterState stormClusterState = data.getStormClusterState();
-        TaskHeartbeat zkTaskHeartbeat = null;
-        try {
-            zkTaskHeartbeat =
-                    stormClusterState.task_heartbeat(topologyId, taskId);
-            if (zkTaskHeartbeat != null) {
-                zkReportTime = zkTaskHeartbeat.getTimeSecs();
+        if (topoTasksHbInfo != null) {
+            taskHbMap = topoTasksHbInfo.get_taskHbs();
+            if (taskHbMap != null) {
+                TaskHeartbeat tHb = taskHbMap.get(taskId);
+                taskReportTime = ((tHb != null ) ? tHb.get_time() : null);
             }
-        } catch (Exception e) {
-            LOG.error("Failed to get ZK task hearbeat " + idStr, e);
         }
 
-        Map<Integer, TkHbCacheTime> taskHBs =
-                data.getTaskHeartbeatsCache(topologyId, true);
+        Map<Integer, TkHbCacheTime> taskHBs = data.getTaskHeartbeatsCache(topologyId, true);
 
         TkHbCacheTime taskHB = taskHBs.get(taskId);
         if (taskHB == null) {
             LOG.info("No task heartbeat cache " + idStr);
 
-            if (zkTaskHeartbeat == null) {
-                LOG.info("No ZK task hearbeat " + idStr);
+            if (topoTasksHbInfo == null || taskHbMap == null) {
+                LOG.info("No task hearbeat was reported for " + idStr);
                 return true;
             }
 
             taskHB = new TkHbCacheTime();
-            taskHB.update(zkTaskHeartbeat);
+            taskHB.update(taskHbMap.get(taskId));
 
             taskHBs.put(taskId, taskHB);
 
             return false;
         }
 
-        if (zkReportTime == null) {
-            LOG.debug("No ZK task heartbeat " + idStr);
+        if (taskReportTime == null || taskReportTime < taskHB.getTaskAssignedTime()) {
+            LOG.debug("No task heartbeat was reported for " + idStr);
             // Task hasn't finish init
             int nowSecs = TimeUtils.current_time_secs();
             int assignSecs = taskHB.getTaskAssignedTime();
 
-            int waitInitTimeout =
-                    JStormUtils.parseInt(data.getConf().get(
-                            Config.NIMBUS_TASK_LAUNCH_SECS));
+            int waitInitTimeout = JStormUtils.parseInt(data.getConf().get(Config.NIMBUS_TASK_LAUNCH_SECS));
 
             if (nowSecs - assignSecs > waitInitTimeout) {
                 LOG.info(idStr + " failed to init ");
@@ -433,30 +368,29 @@ public class NimbusUtils {
         int nowSecs = TimeUtils.current_time_secs();
         if (nimbusTime == 0) {
             // taskHB no entry, first time
-            // update taskHB
+            // update taskHBtaskReportTime
             taskHB.setNimbusTime(nowSecs);
-            taskHB.setTaskReportedTime(zkReportTime);
+            taskHB.setTaskReportedTime(taskReportTime);
 
             LOG.info("Update taskheartbeat to nimbus cache " + idStr);
             return false;
         }
 
-        if (reportTime != zkReportTime.intValue()) {
+        if (reportTime != taskReportTime.intValue()) {
             // zk has been updated the report time
             taskHB.setNimbusTime(nowSecs);
-            taskHB.setTaskReportedTime(zkReportTime);
+            taskHB.setTaskReportedTime(taskReportTime);
 
-            LOG.debug(idStr + ",nimbusTime " + nowSecs + ",zkReport:"
-                    + zkReportTime + ",report:" + reportTime);
+            LOG.debug(idStr + ",nimbusTime " + nowSecs + ",zkReport:" + taskReportTime + ",report:" + reportTime);
             return false;
         }
 
         // the following is (zkReportTime == reportTime)
         Integer taskHBTimeout = data.getTopologyTaskTimeout().get(topologyId);
         if (taskHBTimeout == null)
-            taskHBTimeout =
-                    JStormUtils.parseInt(data.getConf().get(
-                            Config.NIMBUS_TASK_TIMEOUT_SECS));
+            taskHBTimeout = JStormUtils.parseInt(data.getConf().get(Config.NIMBUS_TASK_TIMEOUT_SECS));
+        if (taskId == topoTasksHbInfo.get_topologyMasterId())
+            taskHBTimeout = (taskHBTimeout / 2);
         if (nowSecs - nimbusTime > taskHBTimeout) {
             // task is dead
             long ts = ((long) nimbusTime) * 1000;
@@ -470,7 +404,7 @@ public class NimbusUtils {
             sb.append(",current ");
             sb.append(nowSecs);
             sb.append(":").append(new Date(((long) nowSecs) * 1000));
-            LOG.info(sb.toString());
+            LOG.debug(sb.toString());
             return true;
         }
 
@@ -478,13 +412,10 @@ public class NimbusUtils {
 
     }
 
-    public static void updateTaskHbStartTime(NimbusData data,
-            Assignment assignment, String topologyId) {
-        Map<Integer, TkHbCacheTime> taskHBs =
-                data.getTaskHeartbeatsCache(topologyId, true);
+    public static void updateTaskHbStartTime(NimbusData data, Assignment assignment, String topologyId) {
+        Map<Integer, TkHbCacheTime> taskHBs = data.getTaskHeartbeatsCache(topologyId, true);
 
-        Map<Integer, Integer> taskStartTimes =
-                assignment.getTaskStartTimeSecs();
+        Map<Integer, Integer> taskStartTimes = assignment.getTaskStartTimeSecs();
         for (Entry<Integer, Integer> entry : taskStartTimes.entrySet()) {
             Integer taskId = entry.getKey();
             Integer taskStartTime = entry.getValue();
@@ -501,25 +432,19 @@ public class NimbusUtils {
         return;
     }
 
-    public static <T> void transitionName(NimbusData data, String topologyName,
-            boolean errorOnNoTransition, StatusType transition_status,
-            T... args) throws Exception {
+    public static <T> void transitionName(NimbusData data, String topologyName, boolean errorOnNoTransition, StatusType transition_status, T... args)
+            throws Exception {
         StormClusterState stormClusterState = data.getStormClusterState();
-        String topologyId =
-                Cluster.get_topology_id(stormClusterState, topologyName);
+        String topologyId = Cluster.get_topology_id(stormClusterState, topologyName);
         if (topologyId == null) {
             throw new NotAliveException(topologyName);
         }
-        transition(data, topologyId, errorOnNoTransition, transition_status,
-                args);
+        transition(data, topologyId, errorOnNoTransition, transition_status, args);
     }
 
-    public static <T> void transition(NimbusData data, String topologyid,
-            boolean errorOnNoTransition, StatusType transition_status,
-            T... args) {
+    public static <T> void transition(NimbusData data, String topologyid, boolean errorOnNoTransition, StatusType transition_status, T... args) {
         try {
-            data.getStatusTransition().transition(topologyid,
-                    errorOnNoTransition, transition_status, args);
+            data.getStatusTransition().transition(topologyid, errorOnNoTransition, transition_status, args);
         } catch (Exception e) {
             // TODO Auto-generated catch block
             LOG.error("Failed to do status transition,", e);
@@ -535,22 +460,17 @@ public class NimbusUtils {
         return numTasks;
     }
 
-    public static List<TopologySummary> getTopologySummary(
-            StormClusterState stormClusterState,
-            Map<String, Assignment> assignments) throws Exception {
-        List<TopologySummary> topologySummaries =
-                new ArrayList<TopologySummary>();
+    public static List<TopologySummary> getTopologySummary(StormClusterState stormClusterState, Map<String, Assignment> assignments) throws Exception {
+        List<TopologySummary> topologySummaries = new ArrayList<TopologySummary>();
 
         // get all active topology's StormBase
-        Map<String, StormBase> bases =
-                Cluster.get_all_StormBase(stormClusterState);
+        Map<String, StormBase> bases = Cluster.get_all_StormBase(stormClusterState);
         for (Entry<String, StormBase> entry : bases.entrySet()) {
 
             String topologyId = entry.getKey();
             StormBase base = entry.getValue();
 
-            Assignment assignment =
-                    stormClusterState.assignment_info(topologyId, null);
+            Assignment assignment = stormClusterState.assignment_info(topologyId, null);
             if (assignment == null) {
                 LOG.error("Failed to get assignment of " + topologyId);
                 continue;
@@ -571,11 +491,10 @@ public class NimbusUtils {
             topology.set_id(topologyId);
             topology.set_name(base.getStormName());
             topology.set_status(base.getStatusString());
-            topology.set_uptime_secs(TimeUtils.time_delta(base
-                    .getLanchTimeSecs()));
-            topology.set_num_workers(num_workers);
-            topology.set_num_tasks(num_tasks);
-            topology.set_error_info(errorString);
+            topology.set_uptimeSecs(TimeUtils.time_delta(base.getLanchTimeSecs()));
+            topology.set_numWorkers(num_workers);
+            topology.set_numTasks(num_tasks);
+            topology.set_errorInfo(errorString);
 
             topologySummaries.add(topology);
 
@@ -584,34 +503,26 @@ public class NimbusUtils {
         return topologySummaries;
     }
 
-    public static SupervisorSummary mkSupervisorSummary(
-            SupervisorInfo supervisorInfo, String supervisorId,
-            Map<String, Integer> supervisorToUsedSlotNum) {
+    public static SupervisorSummary mkSupervisorSummary(SupervisorInfo supervisorInfo, String supervisorId, Map<String, Integer> supervisorToUsedSlotNum) {
         Integer usedNum = supervisorToUsedSlotNum.get(supervisorId);
 
         SupervisorSummary summary =
-                new SupervisorSummary(supervisorInfo.getHostName(),
-                        supervisorId, supervisorInfo.getUptimeSecs(),
-                        supervisorInfo.getWorkerPorts().size(),
+                new SupervisorSummary(supervisorInfo.getHostName(), supervisorId, supervisorInfo.getUptimeSecs(), supervisorInfo.getWorkerPorts().size(),
                         usedNum == null ? 0 : usedNum);
 
         return summary;
     }
 
-    public static List<SupervisorSummary> mkSupervisorSummaries(
-            Map<String, SupervisorInfo> supervisorInfos,
-            Map<String, Assignment> assignments) {
+    public static List<SupervisorSummary> mkSupervisorSummaries(Map<String, SupervisorInfo> supervisorInfos, Map<String, Assignment> assignments) {
 
-        Map<String, Integer> supervisorToLeftSlotNum =
-                new HashMap<String, Integer>();
+        Map<String, Integer> supervisorToLeftSlotNum = new HashMap<String, Integer>();
         for (Entry<String, Assignment> entry : assignments.entrySet()) {
             Set<ResourceWorkerSlot> workers = entry.getValue().getWorkers();
 
             for (ResourceWorkerSlot worker : workers) {
 
                 String supervisorId = worker.getNodeId();
-                SupervisorInfo supervisorInfo =
-                        supervisorInfos.get(supervisorId);
+                SupervisorInfo supervisorInfo = supervisorInfos.get(supervisorId);
                 if (supervisorInfo == null) {
                     continue;
                 }
@@ -629,9 +540,7 @@ public class NimbusUtils {
             String supervisorId = entry.getKey();
             SupervisorInfo supervisorInfo = entry.getValue();
 
-            SupervisorSummary summary =
-                    mkSupervisorSummary(supervisorInfo, supervisorId,
-                            supervisorToLeftSlotNum);
+            SupervisorSummary summary = mkSupervisorSummary(supervisorInfo, supervisorId, supervisorToLeftSlotNum);
 
             ret.add(summary);
         }
@@ -648,27 +557,24 @@ public class NimbusUtils {
         return ret;
     }
 
-    public static NimbusSummary getNimbusSummary(
-            StormClusterState stormClusterState,
-            List<SupervisorSummary> supervisorSummaries, NimbusData data)
+    public static NimbusSummary getNimbusSummary(StormClusterState stormClusterState, List<SupervisorSummary> supervisorSummaries, NimbusData data)
             throws Exception {
         NimbusSummary ret = new NimbusSummary();
 
         String master = stormClusterState.get_leader_host();
         NimbusStat nimbusMaster = new NimbusStat();
         nimbusMaster.set_host(master);
-        nimbusMaster.set_uptime_secs(String.valueOf(data.uptime()));
-        ret.set_nimbus_master(nimbusMaster);
+        nimbusMaster.set_uptimeSecs(String.valueOf(data.uptime()));
+        ret.set_nimbusMaster(nimbusMaster);
 
         List<NimbusStat> nimbusSlaveList = new ArrayList<NimbusStat>();
-        ret.set_nimbus_slaves(nimbusSlaveList);
-        Map<String, String> nimbusSlaveMap =
-                Cluster.get_all_nimbus_slave(stormClusterState);
+        ret.set_nimbusSlaves(nimbusSlaveList);
+        Map<String, String> nimbusSlaveMap = Cluster.get_all_nimbus_slave(stormClusterState);
         if (nimbusSlaveMap != null) {
             for (Entry<String, String> entry : nimbusSlaveMap.entrySet()) {
                 NimbusStat slave = new NimbusStat();
                 slave.set_host(entry.getKey());
-                slave.set_uptime_secs(entry.getValue());
+                slave.set_uptimeSecs(entry.getValue());
 
                 nimbusSlaveList.add(slave);
             }
@@ -678,46 +584,75 @@ public class NimbusUtils {
         int usedPort = 0;
 
         for (SupervisorSummary supervisor : supervisorSummaries) {
-            totalPort += supervisor.get_num_workers();
-            usedPort += supervisor.get_num_used_workers();
+            totalPort += supervisor.get_numWorkers();
+            usedPort += supervisor.get_numUsedWorkers();
         }
 
-        ret.set_supervisor_num(supervisorSummaries.size());
-        ret.set_total_port_num(totalPort);
-        ret.set_used_port_num(usedPort);
-        ret.set_free_port_num(totalPort - usedPort);
+        ret.set_supervisorNum(supervisorSummaries.size());
+        ret.set_totalPortNum(totalPort);
+        ret.set_usedPortNum(usedPort);
+        ret.set_freePortNum(totalPort - usedPort);
         ret.set_version(Utils.getVersion());
 
         return ret;
 
     }
 
-    public static void updateTopologyTaskTimeout(NimbusData data,
-            String topologyId) {
+    public static void updateTopologyTaskTimeout(NimbusData data, String topologyId) {
         Map topologyConf = null;
         try {
-            topologyConf =
-                    StormConfig.read_nimbus_topology_conf(data.getConf(),
-                            topologyId);
+            topologyConf = StormConfig.read_nimbus_topology_conf(data.getConf(), topologyId);
         } catch (IOException e) {
-            LOG.warn("Failed to read configuration of " + topologyId + ", "
-                    + e.getMessage());
+            LOG.warn("Failed to read configuration of " + topologyId + ", " + e.getMessage());
         }
 
-        Integer timeout =
-                JStormUtils.parseInt(topologyConf
-                        .get(Config.NIMBUS_TASK_TIMEOUT_SECS));
+        Integer timeout = JStormUtils.parseInt(topologyConf.get(Config.NIMBUS_TASK_TIMEOUT_SECS));
         if (timeout == null) {
-            timeout =
-                    JStormUtils.parseInt(data.getConf().get(
-                            Config.NIMBUS_TASK_TIMEOUT_SECS));
+            timeout = JStormUtils.parseInt(data.getConf().get(Config.NIMBUS_TASK_TIMEOUT_SECS));
         }
         LOG.info("Setting taskTimeout:" + timeout + " for " + topologyId);
         data.getTopologyTaskTimeout().put(topologyId, timeout);
     }
 
-    public static void removeTopologyTaskTimeout(NimbusData data,
-            String topologyId) {
+    public static void removeTopologyTaskTimeout(NimbusData data, String topologyId) {
         data.getTopologyTaskTimeout().remove(topologyId);
     }
+
+    public static void updateTopologyTaskHb(NimbusData data, String topologyId) {
+        StormClusterState clusterState = data.getStormClusterState();
+        TopologyTaskHbInfo topologyTaskHb = null;
+
+        try {
+            topologyTaskHb = clusterState.topology_heartbeat(topologyId);
+        } catch (Exception e) {
+            LOG.error("updateTopologyTaskHb: Failed to get topology task heartbeat info", e);
+        }
+
+        if (topologyTaskHb != null) {
+            data.getTasksHeartbeat().put(topologyId, topologyTaskHb);
+        }
+    }
+
+    public static void removeTopologyTaskHb(NimbusData data, String topologyId, int taskId) {
+        TopologyTaskHbInfo topologyTaskHbs = data.getTasksHeartbeat().get(topologyId);
+
+        if (topologyTaskHbs != null) {
+            Map<Integer, TaskHeartbeat> taskHbs = topologyTaskHbs.get_taskHbs();
+            if (taskHbs != null) {
+                taskHbs.remove(taskId);
+            }
+        }
+    }
+
+    public static int getTopologyMasterId(Map<Integer, TaskInfo> tasksInfo) {
+        int ret = 0;
+        for (Entry<Integer, TaskInfo> entry : tasksInfo.entrySet()) {
+            if (entry.getValue().getComponentId().equalsIgnoreCase(Common.TOPOLOGY_MASTER_COMPONENT_ID)) {
+                ret = entry.getKey();
+                break;
+            }
+        }
+
+        return ret;
+    }
 }