You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:45 UTC

[07/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/SimpleJStormMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/SimpleJStormMetric.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/SimpleJStormMetric.java
index 1d77089..7db0ed4 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/SimpleJStormMetric.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/SimpleJStormMetric.java
@@ -17,80 +17,99 @@
  */
 package com.alibaba.jstorm.metric;
 
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.jstorm.common.metric.Histogram;
-import com.alibaba.jstorm.common.metric.MetricRegistry;
-import com.alibaba.jstorm.common.metric.window.Metric;
-
-public class SimpleJStormMetric extends JStormMetrics implements Runnable{
-    private static final Logger LOG = LoggerFactory.getLogger(SimpleJStormMetric.class);
-    
-    protected static MetricRegistry metrics = JStormMetrics.workerMetrics;
-    static {
-        Metric.setEnable(true);
+import com.alibaba.jstorm.common.metric.*;
+import com.codahale.metrics.Gauge;
+
+/**
+ * simplified metrics, only for worker metrics. all metrics are logged locally without reporting to TM or nimbus.
+ *
+ * @author Cody (weiyue.wy@alibaba-inc.com)
+ * @since 2.0.5
+ */
+public class SimpleJStormMetric extends JStormMetrics {
+    private static final long serialVersionUID = 7468005641982249536L;
+
+    protected static final AsmMetricRegistry metrics = JStormMetrics.getWorkerMetrics();
+
+    public static void updateNimbusHistogram(String name, Number obj) {
+        updateHistogram(NIMBUS_METRIC_KEY, name, obj);
     }
-    
-    protected static SimpleJStormMetric instance = null;
-    
-    
-    public static SimpleJStormMetric mkInstance() {
-        synchronized (SimpleJStormMetric.class) {
-            if (instance == null) {
-                instance = new SimpleJStormMetric();
-            }
-            
-            return instance;
-        }
+
+    public static void updateSupervisorHistogram(String name, Number obj) {
+        updateHistogram(SUPERVISOR_METRIC_KEY, name, obj);
     }
-    
-    protected SimpleJStormMetric() {
-        
+
+    public static void updateNimbusMeter(String name, Number obj) {
+        updateMeter(NIMBUS_METRIC_KEY, name, obj);
+    }
+
+    public static void updateSupervisorMeter(String name, Number obj) {
+        updateMeter(SUPERVISOR_METRIC_KEY, name, obj);
+    }
+
+    public static void updateNimbusCounter(String name, Number obj) {
+        updateCounter(NIMBUS_METRIC_KEY, name, obj);
     }
-    
-    public static Histogram registerHistorgram(String name) {
-        return JStormMetrics.registerWorkerHistogram(name);
+
+    public static void updateSupervisorCounter(String name, Number obj) {
+        updateCounter(SUPERVISOR_METRIC_KEY, name, obj);
     }
-    
-    public static void updateHistorgram(String name, Number obj) {
-        LOG.debug(name  + ":" + obj.doubleValue());
-        Histogram histogram =  (Histogram)metrics.getMetric(name);
+
+    public static void updateHistogram(String key, String name, Number obj) {
+        String formalName = MetricUtils.workerMetricName(key, host, 0, name, MetricType.HISTOGRAM);
+        AsmHistogram histogram = (AsmHistogram) metrics.getMetric(formalName);
         if (histogram == null) {
-        	try {
-        		histogram = registerHistorgram(name);
-        	}catch(Exception e) {
-        		LOG.info("{} has been register", name);
-        		return;
-        	}
+            histogram = registerHistogram(name);
         }
-        
+
         histogram.update(obj);
-        
     }
 
-    @Override
-    public void run() {
-        // TODO Auto-generated method stub
-        Map<String, Metric> map = metrics.getMetrics();
-        
-        for (Entry<String, Metric> entry : map.entrySet()) {
-            String key = entry.getKey();
-            Metric metric = entry.getValue();
-            
-            LOG.info(key + ":" +  metric.getSnapshot());
+    public static void updateMeter(String key, String name, Number obj) {
+        String formalName = MetricUtils.workerMetricName(key, host, 0, name, MetricType.METER);
+        AsmMeter meter = (AsmMeter) metrics.getMetric(formalName);
+        if (meter == null) {
+            meter = registerMeter(name);
         }
+
+        meter.update(obj);
+    }
+
+    public static void updateCounter(String key, String name, Number obj) {
+        String formalName = MetricUtils.workerMetricName(key, host, 0, name, MetricType.COUNTER);
+        AsmCounter counter = (AsmCounter) metrics.getMetric(formalName);
+        if (counter == null) {
+            counter = registerCounter(name);
+        }
+
+        counter.update(obj);
+    }
+
+    private static AsmGauge registerGauge(Gauge<Double> gauge, String name) {
+        AsmGauge gauge1 = new AsmGauge(gauge);
+        gauge1.setOp(AsmMetric.MetricOp.LOG);
+
+        return registerWorkerGauge(topologyId, name, gauge1);
     }
-    
-    
-    public static void main(String[] args) {
-        updateHistorgram("test", 11100.0);
-        
-        SimpleJStormMetric instance = new SimpleJStormMetric();
-        
-        instance.run();
+
+    private static AsmHistogram registerHistogram(String name) {
+        AsmHistogram histogram = new AsmHistogram();
+        histogram.setOp(AsmMetric.MetricOp.LOG);
+
+        return registerWorkerHistogram(NIMBUS_METRIC_KEY, name, histogram);
+    }
+
+    public static AsmMeter registerMeter(String name) {
+        AsmMeter meter = new AsmMeter();
+        meter.setOp(AsmMetric.MetricOp.LOG);
+
+        return registerWorkerMeter(NIMBUS_METRIC_KEY, name, meter);
+    }
+
+    public static AsmCounter registerCounter(String name) {
+        AsmCounter counter = new AsmCounter();
+        counter.setOp(AsmMetric.MetricOp.LOG);
+
+        return registerWorkerCounter(NIMBUS_METRIC_KEY, name, counter);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TimeTicker.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TimeTicker.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TimeTicker.java
new file mode 100644
index 0000000..e44e7b5
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TimeTicker.java
@@ -0,0 +1,52 @@
+package com.alibaba.jstorm.metric;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * a simple util class to calculate run time
+ *
+ * @author Cody (weiyue.wy@alibaba-inc.com)
+ * @since 2.0.5
+ */
+public class TimeTicker {
+    private TimeUnit unit;
+    private long start;
+    private long end;
+
+    public TimeTicker(TimeUnit unit) {
+        if (unit != TimeUnit.NANOSECONDS && unit != TimeUnit.MILLISECONDS) {
+            throw new IllegalArgumentException("invalid unit!");
+        }
+        this.unit = unit;
+    }
+
+    public TimeTicker(TimeUnit unit, boolean start) {
+        this(unit);
+        if (start) {
+            start();
+        }
+    }
+
+    public void start() {
+        if (unit == TimeUnit.MILLISECONDS) {
+            this.start = System.currentTimeMillis();
+        } else if (unit == TimeUnit.NANOSECONDS) {
+            this.start = System.nanoTime();
+        }
+    }
+
+    public long stop() {
+        if (unit == TimeUnit.MILLISECONDS) {
+            this.end = System.currentTimeMillis();
+        } else if (unit == TimeUnit.NANOSECONDS) {
+            this.end = System.nanoTime();
+        }
+        return end - start;
+    }
+
+    public long stopAndRestart() {
+        long elapsed = stop();
+        start();
+        return elapsed;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TopologyMetricContext.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TopologyMetricContext.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TopologyMetricContext.java
new file mode 100644
index 0000000..8dae281
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TopologyMetricContext.java
@@ -0,0 +1,528 @@
+package com.alibaba.jstorm.metric;
+
+import backtype.storm.generated.MetricInfo;
+import backtype.storm.generated.MetricSnapshot;
+import backtype.storm.generated.TopologyMetric;
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import org.apache.commons.lang.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A topology metric context contains all in-memory metric data of a topology.
+ * This class resides in TopologyMaster.
+ *
+ * @author Cody (weiyue.wy@alibaba-inc.com)
+ * @since 2.0.5
+ */
+public class TopologyMetricContext {
+    private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+    private final ReentrantLock lock = new ReentrantLock();
+    private Set<ResourceWorkerSlot> workerSet;
+    private int taskNum = 1;
+    private ConcurrentMap<String, MetricInfo> memCache = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Long> memMeta = new ConcurrentHashMap<>();
+    private final AtomicBoolean isMerging = new AtomicBoolean(false);
+    private String topologyId;
+    private volatile int flushedMetaNum = 0;
+
+    /**
+     * sync meta from metric cache on startup
+     */
+    private volatile boolean syncMeta = false;
+
+    private Map conf;
+
+    public TopologyMetricContext() {
+    }
+
+    public TopologyMetricContext(Set<ResourceWorkerSlot> workerSet) {
+        this.workerSet = workerSet;
+    }
+
+    public TopologyMetricContext(String topologyId, Set<ResourceWorkerSlot> workerSet, Map conf) {
+        this(workerSet);
+        this.topologyId = topologyId;
+        this.conf = conf;
+    }
+
+    public ConcurrentMap<String, Long> getMemMeta() {
+        return memMeta;
+    }
+
+    public String getTopologyId() {
+        return topologyId;
+    }
+
+    public void setTopologyId(String topologyId) {
+        this.topologyId = topologyId;
+    }
+
+    public boolean syncMeta() {
+        return syncMeta;
+    }
+
+    public void setSyncMeta(boolean syncMeta) {
+        this.syncMeta = syncMeta;
+    }
+
+    public int getTaskNum() {
+        return taskNum;
+    }
+
+    public void setTaskNum(int taskNum) {
+        this.taskNum = taskNum;
+    }
+
+    public int getFlushedMetaNum() {
+        return flushedMetaNum;
+    }
+
+    public void setFlushedMetaNum(int flushedMetaNum) {
+        this.flushedMetaNum = flushedMetaNum;
+    }
+
+    public ReentrantLock getLock() {
+        return lock;
+    }
+
+    public int getWorkerNum() {
+        return workerSet.size();
+    }
+
+    public void setWorkerSet(Set<ResourceWorkerSlot> workerSet) {
+        this.workerSet = workerSet;
+    }
+
+    public void resetUploadedMetrics() {
+        this.memCache.clear();
+    }
+
+    public final ConcurrentMap<String, MetricInfo> getMemCache() {
+        return memCache;
+    }
+
+    public void addToMemCache(String workerSlot, MetricInfo metricInfo) {
+        memCache.put(workerSlot, metricInfo);
+        LOG.info("update mem cache, worker:{}, total uploaded:{}", workerSlot, memCache.size());
+    }
+
+    public boolean readyToUpload() {
+        return memCache.size() >= workerSet.size();
+    }
+
+    public boolean isMerging() {
+        return isMerging.get();
+    }
+
+    public void setMerging(boolean isMerging) {
+        this.isMerging.set(isMerging);
+    }
+
+    public int getUploadedWorkerNum() {
+        return memCache.size();
+    }
+
+    public TopologyMetric mergeMetrics() {
+        long start = System.currentTimeMillis();
+
+        if (getMemCache().size() == 0) {
+            //LOG.info("topology:{}, metric size is 0, skip...", topologyId);
+            return null;
+        }
+        if (isMerging()) {
+            LOG.info("topology {} is already merging, skip...", topologyId);
+            return null;
+        }
+
+        setMerging(true);
+
+        try {
+            Map<String, MetricInfo> workerMetricMap = this.memCache;
+            // reset mem cache
+            this.memCache = new ConcurrentHashMap<>();
+
+            MetricInfo topologyMetrics = MetricUtils.mkMetricInfo();
+            MetricInfo componentMetrics = MetricUtils.mkMetricInfo();
+            MetricInfo taskMetrics = MetricUtils.mkMetricInfo();
+            MetricInfo streamMetrics = MetricUtils.mkMetricInfo();
+            MetricInfo workerMetrics = MetricUtils.mkMetricInfo();
+            MetricInfo nettyMetrics = MetricUtils.mkMetricInfo();
+            TopologyMetric tpMetric =
+                    new TopologyMetric(topologyMetrics, componentMetrics, workerMetrics, taskMetrics, streamMetrics, nettyMetrics);
+
+
+            // metric name => worker count
+            Map<String, Integer> metricNameCounters = new HashMap<>();
+
+            // special for histograms & timers, we merge the points to get a new snapshot data.
+            Map<String, Map<Integer, Histogram>> histograms = new HashMap<>();
+            Map<String, Map<Integer, Timer>> timers = new HashMap<>();
+
+            // iterate metrics of all workers within the same topology
+            for (ConcurrentMap.Entry<String, MetricInfo> metricEntry : workerMetricMap.entrySet()) {
+                MetricInfo metricInfo = metricEntry.getValue();
+
+                // merge counters: add old and new values, note we only add incoming new metrics and overwrite
+                // existing data, same for all below.
+                Map<String, Map<Integer, MetricSnapshot>> metrics = metricInfo.get_metrics();
+                for (Map.Entry<String, Map<Integer, MetricSnapshot>> metric : metrics.entrySet()) {
+                    String metricName = metric.getKey();
+                    Map<Integer, MetricSnapshot> data = metric.getValue();
+                    MetaType metaType = MetricUtils.metaType(metricName);
+
+                    MetricType metricType = MetricUtils.metricType(metricName);
+                    if (metricType == MetricType.COUNTER) {
+                        mergeCounters(tpMetric, metaType, metricName, data);
+                    } else if (metricType == MetricType.GAUGE) {
+                        mergeGauges(tpMetric, metaType, metricName, data);
+                    } else if (metricType == MetricType.METER) {
+                        mergeMeters(getMetricInfoByType(tpMetric, metaType), metricName, data, metricNameCounters);
+                    } else if (metricType == MetricType.HISTOGRAM) {
+                        mergeHistograms(getMetricInfoByType(tpMetric, metaType),
+                                metricName, data, metricNameCounters, histograms);
+                    } else if (metricType == MetricType.TIMER) {
+                        mergeTimers(getMetricInfoByType(tpMetric, metaType),
+                                metricName, data, metricNameCounters, timers);
+                    }
+                }
+            }
+            adjustHistogramTimerMetrics(tpMetric, metricNameCounters, histograms, timers);
+            // for counters, we only report delta data every time, need to sum with old data
+            //adjustCounterMetrics(tpMetric, oldTpMetric);
+
+            LOG.info("merge topology metrics:{}, cost:{}", topologyId, System.currentTimeMillis() - start);
+            // debug logs
+            //MetricUtils.printMetricWinSize(componentMetrics);
+
+            return tpMetric;
+        } finally {
+            setMerging(false);
+        }
+    }
+
+
+    protected MetricInfo getMetricInfoByType(TopologyMetric topologyMetric, MetaType type) {
+        if (type == MetaType.TASK) {
+            return topologyMetric.get_taskMetric();
+        } else if (type == MetaType.WORKER) {
+            return topologyMetric.get_workerMetric();
+        } else if (type == MetaType.COMPONENT) {
+            return topologyMetric.get_componentMetric();
+        } else if (type == MetaType.STREAM) {
+            return topologyMetric.get_streamMetric();
+        } else if (type == MetaType.NETTY) {
+            return topologyMetric.get_nettyMetric();
+        } else if (type == MetaType.TOPOLOGY) {
+            return topologyMetric.get_topologyMetric();
+        }
+        return null;
+    }
+
+    public void mergeCounters(TopologyMetric tpMetric, MetaType metaType, String meta,
+                              Map<Integer, MetricSnapshot> data) {
+        MetricInfo metricInfo = getMetricInfoByType(tpMetric, metaType);
+        Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta);
+        if (existing == null) {
+            metricInfo.put_to_metrics(meta, data);
+        } else {
+            for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
+                Integer win = dataEntry.getKey();
+                MetricSnapshot snapshot = dataEntry.getValue();
+                MetricSnapshot old = existing.get(win);
+                if (old == null) {
+                    existing.put(win, snapshot);
+                } else {
+                    old.set_ts(snapshot.get_ts());
+                    old.set_longValue(old.get_longValue() + snapshot.get_longValue());
+                }
+            }
+        }
+    }
+
+    public void mergeGauges(TopologyMetric tpMetric, MetaType metaType, String meta,
+                            Map<Integer, MetricSnapshot> data) {
+        MetricInfo metricInfo = getMetricInfoByType(tpMetric, metaType);
+        Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta);
+        if (existing == null) {
+            metricInfo.put_to_metrics(meta, data);
+        } else {
+            for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
+                Integer win = dataEntry.getKey();
+                MetricSnapshot snapshot = dataEntry.getValue();
+                MetricSnapshot old = existing.get(win);
+                if (old == null) {
+                    existing.put(win, snapshot);
+                } else {
+                    if (snapshot.get_ts() >= old.get_ts()) {
+                        old.set_ts(snapshot.get_ts());
+                        if (metaType != MetaType.TOPOLOGY) {
+                            old.set_doubleValue(snapshot.get_doubleValue());
+                        } else { // for topology metric, gauge might be add-able, e.g., cpu, memory, etc.
+                            old.set_doubleValue(old.get_doubleValue() + snapshot.get_doubleValue());
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * meters are not sampled.
+     */
+    public void mergeMeters(MetricInfo metricInfo, String meta, Map<Integer, MetricSnapshot> data,
+                            Map<String, Integer> metaCounters) {
+        Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta);
+        if (existing == null) {
+            metricInfo.put_to_metrics(meta, data);
+        } else {
+            for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
+                Integer win = dataEntry.getKey();
+                MetricSnapshot snapshot = dataEntry.getValue();
+                MetricSnapshot old = existing.get(win);
+                if (old == null) {
+                    existing.put(win, snapshot);
+                } else {
+                    if (snapshot.get_ts() >= old.get_ts()) {
+                        old.set_ts(snapshot.get_ts());
+                        old.set_mean(old.get_mean() + snapshot.get_mean());
+                        old.set_m1(old.get_m1() + snapshot.get_m1());
+                        old.set_m5(old.get_m5() + snapshot.get_m5());
+                        old.set_m15(old.get_m15() + snapshot.get_m15());
+                    }
+                }
+            }
+        }
+        updateMetricCounters(meta, metaCounters);
+    }
+
+    /**
+     * histograms are sampled, but we just update points
+     */
+    public void mergeHistograms(MetricInfo metricInfo, String meta, Map<Integer, MetricSnapshot> data,
+                                Map<String, Integer> metaCounters, Map<String, Map<Integer, Histogram>> histograms) {
+        Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta);
+        if (existing == null) {
+            metricInfo.put_to_metrics(meta, data);
+            Map<Integer, Histogram> histogramMap = new HashMap<>();
+            for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
+                Histogram histogram = MetricUtils.metricSnapshot2Histogram(dataEntry.getValue());
+                histogramMap.put(dataEntry.getKey(), histogram);
+            }
+            histograms.put(meta, histogramMap);
+        } else {
+            for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
+                Integer win = dataEntry.getKey();
+                MetricSnapshot snapshot = dataEntry.getValue();
+                MetricSnapshot old = existing.get(win);
+                if (old == null) {
+                    existing.put(win, snapshot);
+                    histograms.get(meta).put(win, MetricUtils.metricSnapshot2Histogram(snapshot));
+                } else {
+                    if (snapshot.get_ts() >= old.get_ts()) {
+                        old.set_ts(snapshot.get_ts());
+                        // update points
+                        MetricUtils.updateHistogramPoints(histograms.get(meta).get(win), snapshot.get_points());
+                    }
+                }
+            }
+        }
+        updateMetricCounters(meta, metaCounters);
+    }
+
+    /**
+     * timers are sampled, we just update points
+     */
+    public void mergeTimers(MetricInfo metricInfo, String meta, Map<Integer, MetricSnapshot> data,
+                            Map<String, Integer> metaCounters, Map<String, Map<Integer, Timer>> timers) {
+        Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta);
+        if (existing == null) {
+            metricInfo.put_to_metrics(meta, data);
+            Map<Integer, Timer> timerMap = new HashMap<>();
+            for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
+                Timer timer = MetricUtils.metricSnapshot2Timer(dataEntry.getValue());
+                timerMap.put(dataEntry.getKey(), timer);
+            }
+            timers.put(meta, timerMap);
+        } else {
+            for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
+                Integer win = dataEntry.getKey();
+                MetricSnapshot snapshot = dataEntry.getValue();
+                MetricSnapshot old = existing.get(win);
+                if (old == null) {
+                    existing.put(win, snapshot);
+                    timers.get(meta).put(win, MetricUtils.metricSnapshot2Timer(snapshot));
+                } else {
+                    if (snapshot.get_ts() >= old.get_ts()) {
+                        old.set_ts(snapshot.get_ts());
+                        old.set_m1(old.get_m1() + snapshot.get_m1());
+                        old.set_m5(old.get_m5() + snapshot.get_m5());
+                        old.set_m15(old.get_m15() + snapshot.get_m15());
+
+                        // update points
+                        MetricUtils.updateTimerPoints(timers.get(meta).get(win), snapshot.get_points());
+                    }
+                }
+            }
+        }
+        updateMetricCounters(meta, metaCounters);
+    }
+
+    /**
+     * computes occurrences of specified metric name
+     */
+    protected void updateMetricCounters(String metricName, Map<String, Integer> metricNameCounters) {
+        if (metricNameCounters.containsKey(metricName)) {
+            metricNameCounters.put(metricName, metricNameCounters.get(metricName) + 1);
+        } else {
+            metricNameCounters.put(metricName, 1);
+        }
+    }
+
+    protected void adjustHistogramTimerMetrics(TopologyMetric tpMetric, Map<String, Integer> metaCounters,
+                                               Map<String, Map<Integer, Histogram>> histograms,
+                                               Map<String, Map<Integer, Timer>> timers) {
+        resetPoints(tpMetric.get_taskMetric().get_metrics());
+        resetPoints(tpMetric.get_streamMetric().get_metrics());
+        resetPoints(tpMetric.get_nettyMetric().get_metrics());
+        resetPoints(tpMetric.get_workerMetric().get_metrics());
+
+        Map<String, Map<Integer, MetricSnapshot>> compMetrics =
+                tpMetric.get_componentMetric().get_metrics();
+        Map<String, Map<Integer, MetricSnapshot>> topologyMetrics =
+                tpMetric.get_topologyMetric().get_metrics();
+
+        adjustMetrics(compMetrics, metaCounters, histograms, timers);
+        adjustMetrics(topologyMetrics, metaCounters, histograms, timers);
+    }
+
+    private void adjustMetrics(Map<String, Map<Integer, MetricSnapshot>> metrics, Map<String, Integer> metaCounters,
+                               Map<String, Map<Integer, Histogram>> histograms, Map<String, Map<Integer, Timer>> timers) {
+        for (Map.Entry<String, Map<Integer, MetricSnapshot>> metricEntry : metrics.entrySet()) {
+            String meta = metricEntry.getKey();
+            MetricType metricType = MetricUtils.metricType(meta);
+            MetaType metaType = MetricUtils.metaType(meta);
+            Map<Integer, MetricSnapshot> winData = metricEntry.getValue();
+
+            if (metricType == MetricType.HISTOGRAM) {
+                for (Map.Entry<Integer, MetricSnapshot> dataEntry : winData.entrySet()) {
+                    MetricSnapshot snapshot = dataEntry.getValue();
+                    Integer cnt = metaCounters.get(meta);
+                    Histogram histogram = histograms.get(meta).get(dataEntry.getKey());
+                    if (cnt != null && cnt > 1) {
+
+                        Snapshot snapshot1 = histogram.getSnapshot();
+                        snapshot.set_mean(snapshot1.getMean());
+                        snapshot.set_p50(snapshot1.getMedian());
+                        snapshot.set_p75(snapshot1.get75thPercentile());
+                        snapshot.set_p95(snapshot1.get95thPercentile());
+                        snapshot.set_p98(snapshot1.get98thPercentile());
+                        snapshot.set_p99(snapshot1.get99thPercentile());
+                        snapshot.set_p999(snapshot1.get999thPercentile());
+                        snapshot.set_stddev(snapshot1.getStdDev());
+                        snapshot.set_min(snapshot1.getMin());
+                        snapshot.set_max(snapshot1.getMax());
+
+                        if (metaType == MetaType.TOPOLOGY) {
+                            snapshot.set_points(Arrays.asList(ArrayUtils.toObject(snapshot1.getValues())));
+                        }
+                    }
+                    if (metaType != MetaType.TOPOLOGY) {
+                        snapshot.set_points(new ArrayList<Long>(0));
+                    }
+                }
+
+            } else if (metricType == MetricType.TIMER) {
+                for (Map.Entry<Integer, MetricSnapshot> dataEntry : winData.entrySet()) {
+                    MetricSnapshot snapshot = dataEntry.getValue();
+                    Integer cnt = metaCounters.get(meta);
+                    if (cnt != null && cnt > 1) {
+                        Timer timer = timers.get(meta).get(dataEntry.getKey());
+                        Snapshot snapshot1 = timer.getSnapshot();
+                        snapshot.set_p50(snapshot1.getMedian());
+                        snapshot.set_p75(snapshot1.get75thPercentile());
+                        snapshot.set_p95(snapshot1.get95thPercentile());
+                        snapshot.set_p98(snapshot1.get98thPercentile());
+                        snapshot.set_p99(snapshot1.get99thPercentile());
+                        snapshot.set_p999(snapshot1.get999thPercentile());
+                        snapshot.set_stddev(snapshot1.getStdDev());
+                        snapshot.set_min(snapshot1.getMin());
+                        snapshot.set_max(snapshot1.getMax());
+                    }
+                    snapshot.set_points(new ArrayList<Long>(0));
+                }
+            }
+        }
+    }
+
+    private void resetPoints(Map<String, Map<Integer, MetricSnapshot>> metrics) {
+        for (Map.Entry<String, Map<Integer, MetricSnapshot>> metricEntry : metrics.entrySet()) {
+            String meta = metricEntry.getKey();
+            MetricType metricType = MetricUtils.metricType(meta);
+            Map<Integer, MetricSnapshot> winData = metricEntry.getValue();
+
+            if (metricType == MetricType.HISTOGRAM || metricType == MetricType.TIMER) {
+                for (MetricSnapshot snapshot : winData.values()) {
+                    snapshot.set_points(new ArrayList<Long>(0));
+                }
+            }
+        }
+    }
+
+    protected void adjustCounterMetrics(TopologyMetric tpMetric, TopologyMetric oldMetric) {
+        if (oldMetric != null) {
+            mergeCounters(tpMetric.get_streamMetric().get_metrics(),
+                    oldMetric.get_streamMetric().get_metrics());
+
+            mergeCounters(tpMetric.get_taskMetric().get_metrics(),
+                    oldMetric.get_taskMetric().get_metrics());
+
+            mergeCounters(tpMetric.get_componentMetric().get_metrics(),
+                    oldMetric.get_componentMetric().get_metrics());
+
+            mergeCounters(tpMetric.get_workerMetric().get_metrics(),
+                    oldMetric.get_workerMetric().get_metrics());
+
+            mergeCounters(tpMetric.get_nettyMetric().get_metrics(),
+                    oldMetric.get_nettyMetric().get_metrics());
+        }
+    }
+
+    /**
+     * sum old counter snapshots and new counter snapshots, sums are stored in new snapshots.
+     */
+    private void mergeCounters(Map<String, Map<Integer, MetricSnapshot>> newCounters,
+                               Map<String, Map<Integer, MetricSnapshot>> oldCounters) {
+        for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : newCounters.entrySet()) {
+            String metricName = entry.getKey();
+            Map<Integer, MetricSnapshot> snapshots = entry.getValue();
+            Map<Integer, MetricSnapshot> oldSnapshots = oldCounters.get(metricName);
+            if (oldSnapshots != null && oldSnapshots.size() > 0) {
+                for (Map.Entry<Integer, MetricSnapshot> snapshotEntry : snapshots.entrySet()) {
+                    Integer win = snapshotEntry.getKey();
+                    MetricSnapshot snapshot = snapshotEntry.getValue();
+                    MetricSnapshot oldSnapshot = oldSnapshots.get(win);
+                    if (oldSnapshot != null) {
+                        snapshot.set_longValue(snapshot.get_longValue() + oldSnapshot.get_longValue());
+                    }
+                }
+            }
+        }
+    }
+
+    private double getSampleRate() {
+        return ConfigExtension.getMetricSampleRate(conf);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEvent.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEvent.java b/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEvent.java
index eabcd44..71e7cac 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEvent.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEvent.java
@@ -31,11 +31,10 @@ public class JstormEvent {
         this.msgId = msgId;
     }
 
-    public final static EventFactory<JstormEvent> EVENT_FACTORY =
-            new EventFactory<JstormEvent>() {
-                public JstormEvent newInstance() {
-                    return new JstormEvent();
-                }
-            };
+    public final static EventFactory<JstormEvent> EVENT_FACTORY = new EventFactory<JstormEvent>() {
+        public JstormEvent newInstance() {
+            return new JstormEvent();
+        }
+    };
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEventHandler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEventHandler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEventHandler.java
index dfc43d1..107441d 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEventHandler.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEventHandler.java
@@ -33,8 +33,7 @@ public class JstormEventHandler implements EventHandler {
     }
 
     @Override
-    public void onEvent(Object event, long sequence, boolean endOfBatch)
-            throws Exception {
+    public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
         long msgId = Long.parseLong(((JstormEvent) event).getMsgId());
         // if (msgId % size ==0) {
         // logger.warn("consumer msgId=" + msgId + ", seq=" + sequence);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/Assignment.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/Assignment.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/Assignment.java
index 69620bd..8a05662 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/Assignment.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/Assignment.java
@@ -29,31 +29,28 @@ import org.apache.commons.lang.builder.ToStringStyle;
 import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
 
 /**
- * Assignment of one Toplogy, stored in /ZK-DIR/assignments/{topologyid}
- * nodeHost {supervisorid: hostname} -- assigned supervisor Map
- * taskStartTimeSecs: {taskid, taskStartSeconds} masterCodeDir: topology source
- * code's dir in Nimbus taskToResource: {taskid, ResourceAssignment}
+ * Assignment of one Toplogy, stored in /ZK-DIR/assignments/{topologyid} nodeHost {supervisorid: hostname} -- assigned supervisor Map taskStartTimeSecs:
+ * {taskid, taskStartSeconds} masterCodeDir: topology source code's dir in Nimbus taskToResource: {taskid, ResourceAssignment}
  * 
  * @author Lixin/Longda
  */
 public class Assignment implements Serializable {
     public enum AssignmentType {
-        Assign, Config
+        Assign, UpdateTopology, ScaleTopology
     }
 
     private static final long serialVersionUID = 6087667851333314069L;
 
     private final String masterCodeDir;
     /**
-     * @@@ nodeHost store <supervisorId, hostname>, this will waste some zk
-     *     storage
+     * @@@ nodeHost store <supervisorId, hostname>, this will waste some zk storage
      */
     private final Map<String, String> nodeHost;
     private final Map<Integer, Integer> taskStartTimeSecs;
     private final Set<ResourceWorkerSlot> workers;
 
     private long timeStamp;
-    
+
     private AssignmentType type;
 
     public Assignment() {
@@ -64,10 +61,8 @@ public class Assignment implements Serializable {
         this.timeStamp = System.currentTimeMillis();
         this.type = AssignmentType.Assign;
     }
-    
-    public Assignment(String masterCodeDir, Set<ResourceWorkerSlot> workers,
-            Map<String, String> nodeHost,
-            Map<Integer, Integer> taskStartTimeSecs) {
+
+    public Assignment(String masterCodeDir, Set<ResourceWorkerSlot> workers, Map<String, String> nodeHost, Map<Integer, Integer> taskStartTimeSecs) {
         this.workers = workers;
         this.nodeHost = nodeHost;
         this.taskStartTimeSecs = taskStartTimeSecs;
@@ -79,11 +74,11 @@ public class Assignment implements Serializable {
     public void setAssignmentType(AssignmentType type) {
         this.type = type;
     }
-    
+
     public AssignmentType getAssignmentType() {
         return type;
     }
-    
+
     public Map<String, String> getNodeHost() {
         return nodeHost;
     }
@@ -106,11 +101,9 @@ public class Assignment implements Serializable {
      * @param supervisorId
      * @return Map<Integer, WorkerSlot>
      */
-    public Map<Integer, ResourceWorkerSlot> getTaskToNodePortbyNode(
-            String supervisorId) {
+    public Map<Integer, ResourceWorkerSlot> getTaskToNodePortbyNode(String supervisorId) {
 
-        Map<Integer, ResourceWorkerSlot> result =
-                new HashMap<Integer, ResourceWorkerSlot>();
+        Map<Integer, ResourceWorkerSlot> result = new HashMap<Integer, ResourceWorkerSlot>();
         for (ResourceWorkerSlot worker : workers) {
             if (worker.getNodeId().equals(supervisorId)) {
                 result.put(worker.getPort(), worker);
@@ -144,8 +137,7 @@ public class Assignment implements Serializable {
     public Set<Integer> getCurrentWorkerTasks(String supervisorId, int port) {
 
         for (ResourceWorkerSlot worker : workers) {
-            if (worker.getNodeId().equals(supervisorId)
-                    && worker.getPort() == port)
+            if (worker.getNodeId().equals(supervisorId) && worker.getPort() == port)
                 return worker.getTasks();
         }
 
@@ -164,26 +156,24 @@ public class Assignment implements Serializable {
         return this.timeStamp;
     }
 
+    public boolean isTopologyChange(long oldTimeStamp) {
+        boolean isChange = false;
+        if (timeStamp > oldTimeStamp && (type.equals(AssignmentType.UpdateTopology) || type.equals(AssignmentType.ScaleTopology)))
+            isChange = true;
+        return isChange;
+    }
+
     public void updateTimeStamp() {
         timeStamp = System.currentTimeMillis();
     }
-    
+
     @Override
     public int hashCode() {
         final int prime = 31;
         int result = 1;
-        result =
-                prime
-                        * result
-                        + ((masterCodeDir == null) ? 0 : masterCodeDir
-                                .hashCode());
-        result =
-                prime * result + ((nodeHost == null) ? 0 : nodeHost.hashCode());
-        result =
-                prime
-                        * result
-                        + ((taskStartTimeSecs == null) ? 0 : taskStartTimeSecs
-                                .hashCode());
+        result = prime * result + ((masterCodeDir == null) ? 0 : masterCodeDir.hashCode());
+        result = prime * result + ((nodeHost == null) ? 0 : nodeHost.hashCode());
+        result = prime * result + ((taskStartTimeSecs == null) ? 0 : taskStartTimeSecs.hashCode());
         result = prime * result + ((workers == null) ? 0 : workers.hashCode());
         result = prime * result + (int) (timeStamp & 0xFFFFFFFF);
         return result;
@@ -225,8 +215,7 @@ public class Assignment implements Serializable {
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/AssignmentBak.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/AssignmentBak.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/AssignmentBak.java
index 3e7a770..4b9bbb3 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/AssignmentBak.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/AssignmentBak.java
@@ -32,8 +32,7 @@ public class AssignmentBak implements Serializable {
     private final Map<String, List<Integer>> componentTasks;
     private final Assignment assignment;
 
-    public AssignmentBak(Map<String, List<Integer>> componentTasks,
-            Assignment assignment) {
+    public AssignmentBak(Map<String, List<Integer>> componentTasks, Assignment assignment) {
         super();
         this.componentTasks = componentTasks;
         this.assignment = assignment;
@@ -49,7 +48,6 @@ public class AssignmentBak implements Serializable {
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/CleanRunnable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/CleanRunnable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/CleanRunnable.java
index d73adfd..77bb883 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/CleanRunnable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/CleanRunnable.java
@@ -62,8 +62,7 @@ public class CleanRunnable implements Runnable {
                 try {
                     f.delete();
                 } catch (Exception e) {
-                    log.error("Cleaning inbox ... error deleting:"
-                            + f.getName() + "," + e);
+                    log.error("Cleaning inbox ... error deleting:" + f.getName() + "," + e);
                 }
             } else {
                 clean(f);
@@ -72,8 +71,7 @@ public class CleanRunnable implements Runnable {
                     try {
                         f.delete();
                     } catch (Exception e) {
-                        log.error("Cleaning inbox ... error deleting:"
-                                + f.getName() + "," + e);
+                        log.error("Cleaning inbox ... error deleting:" + f.getName() + "," + e);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/DelayEventRunnable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/DelayEventRunnable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/DelayEventRunnable.java
index 14d38d8..a9683fd 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/DelayEventRunnable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/DelayEventRunnable.java
@@ -28,8 +28,7 @@ public class DelayEventRunnable implements Runnable {
     private StatusType status;
     private Object[] args;
 
-    public DelayEventRunnable(NimbusData data, String topologyid,
-            StatusType status, Object[] args) {
+    public DelayEventRunnable(NimbusData data, String topologyid, StatusType status, Object[] args) {
         this.data = data;
         this.topologyid = topologyid;
         this.status = status;

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/FollowerRunnable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/FollowerRunnable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/FollowerRunnable.java
index e62c61b..d87b65e 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/FollowerRunnable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/FollowerRunnable.java
@@ -18,21 +18,21 @@
 package com.alibaba.jstorm.schedule;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.commons.io.FileExistsException;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-
 import com.alibaba.jstorm.callback.RunnableCallback;
 import com.alibaba.jstorm.client.ConfigExtension;
 import com.alibaba.jstorm.cluster.Cluster;
@@ -44,10 +44,12 @@ import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.NetWorkUtils;
 import com.alibaba.jstorm.utils.PathUtils;
 
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
+
 public class FollowerRunnable implements Runnable {
 
-    private static final Logger LOG = LoggerFactory
-            .getLogger(FollowerRunnable.class);
+    private static final Logger LOG = LoggerFactory.getLogger(FollowerRunnable.class);
 
     private NimbusData data;
 
@@ -59,50 +61,62 @@ public class FollowerRunnable implements Runnable {
 
     private final String hostPort;
 
+    public static final String NIMBUS_DIFFER_COUNT_ZK = "nimbus.differ.count.zk";
+
+    public static final Integer SLAVE_NIMBUS_WAIT_TIME = 120;
+
     @SuppressWarnings("unchecked")
     public FollowerRunnable(final NimbusData data, int sleepTime) {
         this.data = data;
         this.sleepTime = sleepTime;
+
         if (!ConfigExtension.isNimbusUseIp(data.getConf())) {
-            this.hostPort =
-                    NetWorkUtils.hostname()
-                            + ":"
-                            + String.valueOf(Utils.getInt(data.getConf().get(
-                                    Config.NIMBUS_THRIFT_PORT)));
+            this.hostPort = NetWorkUtils.hostname() + ":" + String.valueOf(Utils.getInt(data.getConf().get(Config.NIMBUS_THRIFT_PORT)));
         } else {
-            this.hostPort =
-                    NetWorkUtils.ip()
-                            + ":"
-                            + String.valueOf(Utils.getInt(data.getConf().get(
-                                    Config.NIMBUS_THRIFT_PORT)));
+            this.hostPort = NetWorkUtils.ip() + ":" + String.valueOf(Utils.getInt(data.getConf().get(Config.NIMBUS_THRIFT_PORT)));
         }
         try {
+
             String[] hostfigs = this.hostPort.split(":");
             boolean isLocaliP = false;
-            if(hostfigs.length > 0){
+            if (hostfigs.length > 0) {
                 isLocaliP = hostfigs[0].equals("127.0.0.1");
             }
-            if(isLocaliP){
+            if (isLocaliP) {
                 throw new Exception("the hostname which Nimbus get is localhost");
             }
-        }catch(Exception e1){
-            LOG.error("get nimbus host error!", e1);
-            throw new RuntimeException(e1);
-        }
-        try {
-            this.tryToBeLeader(data.getConf());
         } catch (Exception e1) {
-            // TODO Auto-generated catch block
-            LOG.error("try to be leader error.", e1);
+            LOG.error("get nimbus host error!", e1);
             throw new RuntimeException(e1);
         }
+
         try {
-            data.getStormClusterState().update_nimbus_slave(hostPort,
-                    data.uptime());
+            data.getStormClusterState().update_nimbus_slave(hostPort, data.uptime());
+            data.getStormClusterState().update_nimbus_detail(hostPort, null);
         } catch (Exception e) {
             LOG.error("register nimbus host fail!", e);
             throw new RuntimeException();
         }
+        try{
+            update_nimbus_detail();
+        }catch (Exception e){
+            LOG.error("register detail of nimbus fail!", e);
+            throw new RuntimeException();
+        }
+        try {
+            this.tryToBeLeader(data.getConf());
+        } catch (Exception e1) {
+            try {
+                data.getStormClusterState().unregister_nimbus_host(hostPort);
+                data.getStormClusterState().unregister_nimbus_detail(hostPort);
+            }catch (Exception e2){
+                LOG.info("due to task errors, so remove register nimbus infomation" );
+            }finally {
+                // TODO Auto-generated catch block
+                LOG.error("try to be leader error.", e1);
+                throw new RuntimeException(e1);
+            }
+        }
         callback = new RunnableCallback() {
             @Override
             public void run() {
@@ -121,6 +135,8 @@ public class FollowerRunnable implements Runnable {
             return true;
         }
 
+        // Two nimbus running on the same node isn't allowed
+        // so just checks ip is enough here
         String[] part = zkMaster.split(":");
         return NetWorkUtils.equals(part[0], NetWorkUtils.ip());
     }
@@ -143,20 +159,21 @@ public class FollowerRunnable implements Runnable {
                 if (data.isLeader() == true) {
                     if (isZkLeader == false) {
                         LOG.info("New ZK master is " + master);
-                        JStormUtils.halt_process(1,
-                                "Lose ZK master node, halt process");
+                        JStormUtils.halt_process(1, "Lose ZK master node, halt process");
                         return;
                     }
                 }
 
                 if (isZkLeader == true) {
                     zkClusterState.unregister_nimbus_host(hostPort);
+                    zkClusterState.unregister_nimbus_detail(hostPort);
                     data.setLeader(true);
                     continue;
                 }
 
                 check();
                 zkClusterState.update_nimbus_slave(hostPort, data.uptime());
+                update_nimbus_detail();
             } catch (InterruptedException e) {
                 // TODO Auto-generated catch block
                 continue;
@@ -178,14 +195,29 @@ public class FollowerRunnable implements Runnable {
         StormClusterState clusterState = data.getStormClusterState();
 
         try {
-            String master_stormdist_root =
-                    StormConfig.masterStormdistRoot(data.getConf());
+            String master_stormdist_root = StormConfig.masterStormdistRoot(data.getConf());
 
-            List<String> code_ids =
-                    PathUtils.read_dir_contents(master_stormdist_root);
+            List<String> code_ids = PathUtils.read_dir_contents(master_stormdist_root);
 
             List<String> assignments_ids = clusterState.assignments(callback);
 
+            Map<String, Assignment> assignmentMap = new HashMap<String, Assignment>();
+            List<String> update_ids = new ArrayList<String>();
+            for (String id : assignments_ids) {
+                Assignment assignment = clusterState.assignment_info(id, null);
+                Long localCodeDownTS;
+                try {
+                    Long tmp = StormConfig.read_nimbus_topology_timestamp(data.getConf(), id);
+                    localCodeDownTS = (tmp == null ? 0L : tmp);
+                } catch (FileNotFoundException e) {
+                    localCodeDownTS = 0L;
+                }
+                if (assignment != null && assignment.isTopologyChange(localCodeDownTS.longValue())) {
+                    update_ids.add(id);
+                }
+                assignmentMap.put(id, assignment);
+            }
+
             List<String> done_ids = new ArrayList<String>();
 
             for (String id : code_ids) {
@@ -199,13 +231,15 @@ public class FollowerRunnable implements Runnable {
                 code_ids.remove(id);
             }
 
+            //redownload  topologyid which hava been updated;
+            assignments_ids.addAll(update_ids);
+
             for (String topologyId : code_ids) {
                 deleteLocalTopology(topologyId);
             }
 
             for (String id : assignments_ids) {
-                Assignment assignment = clusterState.assignment_info(id, null);
-                downloadCodeFromMaster(assignment, id);
+                downloadCodeFromMaster(assignmentMap.get(id), id);
             }
         } catch (IOException e) {
             // TODO Auto-generated catch block
@@ -219,8 +253,7 @@ public class FollowerRunnable implements Runnable {
     }
 
     private void deleteLocalTopology(String topologyId) throws IOException {
-        String dir_to_delete =
-                StormConfig.masterStormdistRoot(data.getConf(), topologyId);
+        String dir_to_delete = StormConfig.masterStormdistRoot(data.getConf(), topologyId);
         try {
             PathUtils.rmr(dir_to_delete);
             LOG.info("delete:" + dir_to_delete + "successfully!");
@@ -230,47 +263,113 @@ public class FollowerRunnable implements Runnable {
         }
     }
 
-    private void downloadCodeFromMaster(Assignment assignment, String topologyId)
-            throws IOException, TException {
+    private void downloadCodeFromMaster(Assignment assignment, String topologyId) throws IOException, TException {
         try {
-            String localRoot =
-                    StormConfig.masterStormdistRoot(data.getConf(), topologyId);
-            String tmpDir =
-                    StormConfig.masterInbox(data.getConf()) + "/"
-                            + UUID.randomUUID().toString();
+            String localRoot = StormConfig.masterStormdistRoot(data.getConf(), topologyId);
+            String tmpDir = StormConfig.masterInbox(data.getConf()) + "/" + UUID.randomUUID().toString();
             String masterCodeDir = assignment.getMasterCodeDir();
-            JStormServerUtils.downloadCodeFromMaster(data.getConf(), tmpDir,
-                    masterCodeDir, topologyId, false);
+            JStormServerUtils.downloadCodeFromMaster(data.getConf(), tmpDir, masterCodeDir, topologyId, false);
 
-            FileUtils.moveDirectory(new File(tmpDir), new File(localRoot));
+            File srcDir = new File(tmpDir);
+            File destDir = new File(localRoot);
+            try {
+                FileUtils.moveDirectory(srcDir, destDir);
+            } catch (FileExistsException e) {
+                FileUtils.copyDirectory(srcDir, destDir);
+                FileUtils.deleteQuietly(srcDir);
+            }
+            // Update downloadCode timeStamp
+            StormConfig.write_nimbus_topology_timestamp(data.getConf(), topologyId, System.currentTimeMillis());
         } catch (TException e) {
             // TODO Auto-generated catch block
-            LOG.error(e + " downloadStormCode failed " + "topologyId:"
-                    + topologyId + "masterCodeDir:"
-                    + assignment.getMasterCodeDir());
+            LOG.error(e + " downloadStormCode failed " + "topologyId:" + topologyId + "masterCodeDir:" + assignment.getMasterCodeDir());
             throw e;
         }
-        LOG.info("Finished downloading code for topology id " + topologyId
-                + " from " + assignment.getMasterCodeDir());
+        LOG.info("Finished downloading code for topology id " + topologyId + " from " + assignment.getMasterCodeDir());
     }
 
     private void tryToBeLeader(final Map conf) throws Exception {
-        RunnableCallback masterCallback = new RunnableCallback() {
-            @Override
-            public void run() {
-                try {
-                    tryToBeLeader(conf);
-                } catch (Exception e) {
-                    LOG.error("To be master error", e);
-                    JStormUtils.halt_process(30,
-                            "Cant't to be master" + e.getMessage());
+        boolean allowed = check_nimbus_priority();
+        
+        if (allowed){
+            RunnableCallback masterCallback = new RunnableCallback() {
+                @Override
+                public void run() {
+                    try {
+                        tryToBeLeader(conf);
+                    } catch (Exception e) {
+                        LOG.error("To be master error", e);
+                        JStormUtils.halt_process(30, "Cant't to be master" + e.getMessage());
+                    }
                 }
-            }
-        };
-        data.getStormClusterState().try_to_be_leader(Cluster.MASTER_SUBTREE,
-                hostPort, masterCallback);
+            };
+            LOG.info("This nimbus can be  leader");
+            data.getStormClusterState().try_to_be_leader(Cluster.MASTER_SUBTREE, hostPort, masterCallback);
+        }else {
+        	LOG.info("This nimbus can't be leader");
+        }
     }
+    /**
+     * Compared with other nimbus ,get priority of this nimbus
+     *
+     * @throws Exception
+     */
+    private  boolean check_nimbus_priority() throws Exception {
+    	
+    	int gap = update_nimbus_detail();
+    	if (gap == 0) {
+    		return true;
+    	}
+    	
+    	int left = SLAVE_NIMBUS_WAIT_TIME;
+        while(left > 0) {
+        	LOG.info( "After " + left + " seconds, nimbus will try to be Leader!");
+            Thread.sleep(10 * 1000);
+            left -= 10;
+        }
+
+        StormClusterState zkClusterState = data.getStormClusterState();
+
+        List<String> followers = zkClusterState.list_dirs(Cluster.NIMBUS_SLAVE_DETAIL_SUBTREE, false);
+        if (followers == null || followers.size() == 0) {
+            return false;
+        }
 
+        for (String follower : followers) {
+            if (follower != null && !follower.equals(hostPort)) {
+                Map bMap = zkClusterState.get_nimbus_detail(follower, false);
+                if (bMap != null){
+                    Object object = bMap.get(NIMBUS_DIFFER_COUNT_ZK);
+                    if (object != null && (JStormUtils.parseInt(object)) < gap){
+                    	LOG.info("Current node can't be leader, due to {} has higher priority", follower);
+                        return false;
+                    }
+                }
+            }
+        }
+        
+        
+        
+        return true;
+    }
+    private int update_nimbus_detail() throws Exception {
+
+        //update count = count of zk's binary files - count of nimbus's binary files
+        StormClusterState zkClusterState = data.getStormClusterState();
+        String master_stormdist_root = StormConfig.masterStormdistRoot(data.getConf());
+        List<String> code_ids = PathUtils.read_dir_contents(master_stormdist_root);
+        List<String> assignments_ids = data.getStormClusterState().assignments(callback);
+        assignments_ids.removeAll(code_ids);
+
+        Map mtmp = zkClusterState.get_nimbus_detail(hostPort, false);
+        if (mtmp == null){
+            mtmp = new HashMap();
+        }
+        mtmp.put(NIMBUS_DIFFER_COUNT_ZK, assignments_ids.size());
+        zkClusterState.update_nimbus_detail(hostPort, mtmp);
+        LOG.debug("update nimbus's detail " + mtmp);
+        return assignments_ids.size();
+    }
     /**
      * Check whether current node is master or not
      * 
@@ -291,13 +390,11 @@ public class FollowerRunnable implements Runnable {
                 // current process own master
                 return;
             }
-            LOG.warn("Current Nimbus has start thrift, but fail to own zk master :"
-                    + zkHost);
+            LOG.warn("Current Nimbus has start thrift, but fail to own zk master :" + zkHost);
         }
 
         // current process doesn't own master
-        String err =
-                "Current Nimubs fail to own nimbus_master, should halt process";
+        String err = "Current Nimubs fail to own nimbus_master, should halt process";
         LOG.error(err);
         JStormUtils.halt_process(0, err);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/IToplogyScheduler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/IToplogyScheduler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/IToplogyScheduler.java
index a9d9b92..a6e6093 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/IToplogyScheduler.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/IToplogyScheduler.java
@@ -26,6 +26,5 @@ import com.alibaba.jstorm.utils.FailedAssignTopologyException;
 public interface IToplogyScheduler {
     void prepare(Map conf);
 
-    Set<ResourceWorkerSlot> assignTasks(TopologyAssignContext contex)
-            throws FailedAssignTopologyException;
+    Set<ResourceWorkerSlot> assignTasks(TopologyAssignContext contex) throws FailedAssignTopologyException;
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/MonitorRunnable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/MonitorRunnable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/MonitorRunnable.java
index 8342b79..b7c8e89 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/MonitorRunnable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/MonitorRunnable.java
@@ -17,27 +17,28 @@
  */
 package com.alibaba.jstorm.schedule;
 
-import java.util.Date;
-import java.util.List;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import backtype.storm.Config;
+import backtype.storm.generated.TaskHeartbeat;
+import backtype.storm.generated.TopologyTaskHbInfo;
 
 import com.alibaba.jstorm.cluster.StormClusterState;
 import com.alibaba.jstorm.daemon.nimbus.NimbusData;
 import com.alibaba.jstorm.daemon.nimbus.NimbusUtils;
 import com.alibaba.jstorm.daemon.nimbus.StatusType;
 import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
+import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.TimeFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable.TaskDeadEvent;
+
+import java.util.*;
 
 /**
- * 
- * Scan all task's heartbeat, if task isn't alive, DO
- * NimbusUtils.transition(monitor)
+ * Scan all task's heartbeat, if task isn't alive, DO NimbusUtils.transition(monitor)
  * 
  * @author Longda
- * 
  */
 public class MonitorRunnable implements Runnable {
     private static Logger LOG = LoggerFactory.getLogger(MonitorRunnable.class);
@@ -49,9 +50,7 @@ public class MonitorRunnable implements Runnable {
     }
 
     /**
-     * @@@ Todo when one topology is being reassigned, the topology should be
-     *     skip check
-     * @param data
+     * @@@ Todo when one topology is being reassigned, the topology should skip check
      */
     @Override
     public void run() {
@@ -80,44 +79,84 @@ public class MonitorRunnable implements Runnable {
                     LOG.info("Failed to get task ids of " + topologyid);
                     continue;
                 }
+                Assignment assignment = clusterState.assignment_info(topologyid, null);
 
+                Set<Integer> deadTasks = new HashSet<Integer>();
                 boolean needReassign = false;
                 for (Integer task : taskIds) {
-                    boolean isTaskDead =
-                            NimbusUtils.isTaskDead(data, topologyid, task);
+                    boolean isTaskDead = NimbusUtils.isTaskDead(data, topologyid, task);
                     if (isTaskDead == true) {
-                        LOG.info("Found " + topologyid + ",taskid:" + task
-                                + " is dead");
-
-                        ResourceWorkerSlot resource = null;
-                        Assignment assignment =
-                                clusterState.assignment_info(topologyid, null);
-                        if (assignment != null)
-                            resource = assignment.getWorkerByTaskId(task);
-                        if (resource != null) {
-                            Date now = new Date();
-                            String nowStr = TimeFormat.getSecond(now);
-                            String errorInfo =
-                                    "Task-" + task + " is dead on "
-                                            + resource.getHostname() + ":"
-                                            + resource.getPort() + ", "
-                                            + nowStr;
-                            LOG.info(errorInfo);
-                            clusterState.report_task_error(topologyid, task,
-                                    errorInfo);
-                        }
+                        deadTasks.add(task);
                         needReassign = true;
-                        break;
                     }
                 }
+
+
+                TopologyTaskHbInfo topologyHbInfo = data.getTasksHeartbeat().get(topologyid);
                 if (needReassign == true) {
-                    NimbusUtils.transition(data, topologyid, false,
-                            StatusType.monitor);
+                    if (topologyHbInfo != null) {
+                        int topologyMasterId = topologyHbInfo.get_topologyMasterId();
+                        if (deadTasks.contains(topologyMasterId)) {
+                            deadTasks.clear();
+                            if (assignment != null) {
+                                ResourceWorkerSlot resource = assignment.getWorkerByTaskId(topologyMasterId);
+                                if (resource != null)
+                                    deadTasks.addAll(resource.getTasks());
+                                else
+                                    deadTasks.add(topologyMasterId);
+                            }
+                        } else {
+                            Map<Integer, TaskHeartbeat> taskHbs = topologyHbInfo.get_taskHbs();
+                            int launchTime = JStormUtils.parseInt(data.getConf().get(Config.NIMBUS_TASK_LAUNCH_SECS));
+                            if (taskHbs == null || taskHbs.get(topologyMasterId) == null || taskHbs.get(topologyMasterId).get_uptime() < launchTime) {
+                                /*try {
+                                    clusterState.topology_heartbeat(topologyid, topologyHbInfo);
+                                } catch (Exception e) {
+                                    LOG.error("Failed to update task heartbeat info to ZK for " + topologyid, e);
+                                }*/
+                                return;
+                            }
+                        }
+                        Map<Integer, ResourceWorkerSlot> deadTaskWorkers = new HashMap<>();
+                        for (Integer task : deadTasks) {
+                            LOG.info("Found " + topologyid + ",taskid:" + task + " is dead");
+
+                            ResourceWorkerSlot resource = null;
+                            if (assignment != null)
+                                resource = assignment.getWorkerByTaskId(task);
+                            if (resource != null) {
+                                deadTaskWorkers.put(task, resource);
+                                Date now = new Date();
+                                String nowStr = TimeFormat.getSecond(now);
+                                String errorInfo = "Task-" + task + " is dead on " + resource.getHostname() + ":" + resource.getPort() + ", " + nowStr;
+                                LOG.info(errorInfo);
+                                clusterState.report_task_error(topologyid, task, errorInfo, null);
+                            }
+                        }
+
+                        if (deadTaskWorkers.size() > 0) {
+                            // notify jstorm monitor
+                            TaskDeadEvent event = new TaskDeadEvent();
+                            event.clusterName = data.getClusterName();
+                            event.topologyId = topologyid;
+                            event.deadTasks = deadTaskWorkers;
+                            event.timestamp = System.currentTimeMillis();
+                            data.getMetricRunnable().pushEvent(event);
+                        }
+                    }
+                    NimbusUtils.transition(data, topologyid, false, StatusType.monitor);
+                }
+                
+                if (topologyHbInfo != null) {
+                    try {
+                        clusterState.topology_heartbeat(topologyid, topologyHbInfo);
+                    } catch (Exception e) {
+                        LOG.error("Failed to update task heartbeat info to ZK for " + topologyid, e);
+                    }
                 }
             }
 
         } catch (Exception e) {
-            // TODO Auto-generated catch block
             LOG.error(e.getMessage(), e);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/TopologyAssignContext.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/TopologyAssignContext.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/TopologyAssignContext.java
index 12cdad0..9a3a879 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/TopologyAssignContext.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/TopologyAssignContext.java
@@ -38,6 +38,8 @@ public class TopologyAssignContext {
     public static final int ASSIGN_TYPE_MONITOR = 2; // monitor a topology, some
                                                      // tasks are dead
 
+    protected String topologyId;
+
     protected int assignType;
 
     protected StormTopology rawTopology;
@@ -51,6 +53,9 @@ public class TopologyAssignContext {
 
     protected Map<String, SupervisorInfo> cluster;
 
+    protected int topoMasterTaskId;
+    protected boolean assignSingleWorkerForTM = false;
+
     protected Map<Integer, String> taskToComponent;
 
     protected Set<Integer> allTaskIds; // all tasks
@@ -76,6 +81,17 @@ public class TopologyAssignContext {
         this.deadTaskIds = copy.getDeadTaskIds();
         this.unstoppedTaskIds = copy.getUnstoppedTaskIds();
         this.isReassign = copy.isReassign();
+        this.topologyId = copy.getTopologyId();
+        this.topoMasterTaskId = copy.getTopologyMasterTaskId();
+        this.assignSingleWorkerForTM = copy.getAssignSingleWorkerForTM();
+    }
+
+    public String getTopologyId() {
+        return topologyId;
+    }
+
+    public void setTopologyId(String topologyId) {
+        this.topologyId = topologyId;
     }
 
     public int getAssignType() {
@@ -151,8 +167,7 @@ public class TopologyAssignContext {
     }
 
     public static boolean isAssignTypeValid(int type) {
-        return (type == ASSIGN_TYPE_NEW) || (type == ASSIGN_TYPE_REBALANCE)
-                || (type == ASSIGN_TYPE_MONITOR);
+        return (type == ASSIGN_TYPE_NEW) || (type == ASSIGN_TYPE_REBALANCE) || (type == ASSIGN_TYPE_MONITOR);
     }
 
     public Set<ResourceWorkerSlot> getUnstoppedWorkers() {
@@ -171,9 +186,24 @@ public class TopologyAssignContext {
         this.isReassign = isReassign;
     }
 
+    public int getTopologyMasterTaskId() {
+        return topoMasterTaskId;
+    }
+
+    public void setTopologyMasterTaskId(int taskId) {
+        this.topoMasterTaskId = taskId;
+    }
+
+    public boolean getAssignSingleWorkerForTM() {
+        return assignSingleWorkerForTM;
+    }
+
+    public void setAssignSingleWorkerForTM(boolean isAssign) {
+        this.assignSingleWorkerForTM = isAssign;
+    }
+
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyAssignContext.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyAssignContext.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyAssignContext.java
index 9eb2775..78649aa 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyAssignContext.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyAssignContext.java
@@ -17,30 +17,20 @@
  */
 package com.alibaba.jstorm.schedule.default_assign;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-
 import backtype.storm.Config;
-import backtype.storm.generated.Bolt;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.SpoutSpec;
-import backtype.storm.generated.StateSpoutSpec;
-import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.*;
 import backtype.storm.utils.ThriftTopologyUtils;
-
+import com.alibaba.jstorm.client.ConfigExtension;
 import com.alibaba.jstorm.cluster.Common;
 import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
 import com.alibaba.jstorm.schedule.TopologyAssignContext;
 import com.alibaba.jstorm.utils.FailedAssignTopologyException;
 import com.alibaba.jstorm.utils.JStormUtils;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+
+import java.util.*;
+import java.util.Map.Entry;
 
 public class DefaultTopologyAssignContext extends TopologyAssignContext {
 
@@ -49,19 +39,16 @@ public class DefaultTopologyAssignContext extends TopologyAssignContext {
     private final Map<String, List<String>> hostToSid;
     private final Set<ResourceWorkerSlot> oldWorkers;
     private final Map<String, List<Integer>> componentTasks;
-    private final Set<ResourceWorkerSlot> unstoppedWorkers =
-            new HashSet<ResourceWorkerSlot>();
+    private final Set<ResourceWorkerSlot> unstoppedWorkers = new HashSet<ResourceWorkerSlot>();
     private final int totalWorkerNum;
     private final int unstoppedWorkerNum;
 
     private int computeWorkerNum() {
-        Integer settingNum =
-                JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_WORKERS));
+        Integer settingNum = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_WORKERS));
 
-        int hintSum = 0;
+        int ret = 0, hintSum = 0, tmCount = 0;
 
-        Map<String, Object> components =
-                ThriftTopologyUtils.getComponents(sysTopology);
+        Map<String, Object> components = ThriftTopologyUtils.getComponents(sysTopology);
         for (Entry<String, Object> entry : components.entrySet()) {
             String componentName = entry.getKey();
             Object component = entry.getValue();
@@ -78,14 +65,35 @@ public class DefaultTopologyAssignContext extends TopologyAssignContext {
             }
 
             int hint = common.get_parallelism_hint();
+            if (componentName.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) {
+                tmCount += hint;
+                continue;
+            }
             hintSum += hint;
         }
 
         if (settingNum == null) {
-            return hintSum;
+            ret = hintSum;
         } else {
-            return Math.min(settingNum, hintSum);
+            ret =  Math.min(settingNum, hintSum);
         }
+
+        Boolean isTmSingleWorker = ConfigExtension.getTopologyMasterSingleWorker(stormConf);
+        if (isTmSingleWorker != null) {
+            if (isTmSingleWorker == true) {
+                // Assign a single worker for topology master
+                ret += tmCount;
+                setAssignSingleWorkerForTM(true);
+            }
+        } else {
+            // If not configured, judge this config by worker number
+            if (ret >= 10) {
+                ret += tmCount;
+                setAssignSingleWorkerForTM(true);
+            }
+        }
+
+        return ret;
     }
 
     public int computeUnstoppedAssignments() {
@@ -149,8 +157,7 @@ public class DefaultTopologyAssignContext extends TopologyAssignContext {
         try {
             sysTopology = Common.system_topology(stormConf, rawTopology);
         } catch (Exception e) {
-            throw new FailedAssignTopologyException(
-                    "Failed to generate system topology");
+            throw new FailedAssignTopologyException("Failed to generate system topology");
         }
 
         sidToHostname = generateSidToHost();
@@ -215,7 +222,6 @@ public class DefaultTopologyAssignContext extends TopologyAssignContext {
     }
 
     public String toDetailString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyScheduler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyScheduler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyScheduler.java
index 5df7de4..99ba9da 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyScheduler.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyScheduler.java
@@ -32,8 +32,7 @@ import com.alibaba.jstorm.schedule.TopologyAssignContext;
 import com.alibaba.jstorm.utils.FailedAssignTopologyException;
 
 public class DefaultTopologyScheduler implements IToplogyScheduler {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(DefaultTopologyScheduler.class);
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultTopologyScheduler.class);
 
     private Map nimbusConf;
 
@@ -57,8 +56,7 @@ public class DefaultTopologyScheduler implements IToplogyScheduler {
         for (Integer task : canFree) {
             ResourceWorkerSlot worker = oldAssigns.getWorkerByTaskId(task);
             if (worker == null) {
-                LOG.warn("When free rebalance resource, no ResourceAssignment of task "
-                        + task);
+                LOG.warn("When free rebalance resource, no ResourceAssignment of task " + task);
                 continue;
             }
 
@@ -79,25 +77,22 @@ public class DefaultTopologyScheduler implements IToplogyScheduler {
         } else if (assignType == TopologyAssignContext.ASSIGN_TYPE_REBALANCE) {
             needAssign.addAll(context.getAllTaskIds());
             needAssign.removeAll(context.getUnstoppedTaskIds());
-        } else {
-            // monitor
-            needAssign.addAll(context.getDeadTaskIds());
+        } else { // ASSIGN_TYPE_MONITOR
+            Set<Integer> deadTasks = context.getDeadTaskIds();
+            needAssign.addAll(deadTasks);
         }
 
         return needAssign;
     }
 
     /**
-     * Get the task Map which the task is alive and will be kept Only when type
-     * is ASSIGN_TYPE_MONITOR, it is valid
+     * Get the task Map which the task is alive and will be kept Only when type is ASSIGN_TYPE_MONITOR, it is valid
      * 
      * @param defaultContext
      * @param needAssigns
      * @return
      */
-    public Set<ResourceWorkerSlot> getKeepAssign(
-            DefaultTopologyAssignContext defaultContext,
-            Set<Integer> needAssigns) {
+    public Set<ResourceWorkerSlot> getKeepAssign(DefaultTopologyAssignContext defaultContext, Set<Integer> needAssigns) {
 
         Set<Integer> keepAssignIds = new HashSet<Integer>();
         keepAssignIds.addAll(defaultContext.getAllTaskIds());
@@ -125,21 +120,17 @@ public class DefaultTopologyScheduler implements IToplogyScheduler {
     }
 
     @Override
-    public Set<ResourceWorkerSlot> assignTasks(TopologyAssignContext context)
-            throws FailedAssignTopologyException {
+    public Set<ResourceWorkerSlot> assignTasks(TopologyAssignContext context) throws FailedAssignTopologyException {
 
         int assignType = context.getAssignType();
         if (TopologyAssignContext.isAssignTypeValid(assignType) == false) {
-            throw new FailedAssignTopologyException("Invalide Assign Type "
-                    + assignType);
+            throw new FailedAssignTopologyException("Invalide Assign Type " + assignType);
         }
 
-        DefaultTopologyAssignContext defaultContext =
-                new DefaultTopologyAssignContext(context);
+        DefaultTopologyAssignContext defaultContext = new DefaultTopologyAssignContext(context);
         if (assignType == TopologyAssignContext.ASSIGN_TYPE_REBALANCE) {
             /**
-             * Mark all current assigned worker as available. Current assignment
-             * will be restored in task scheduler.
+             * Mark all current assigned worker as available. Current assignment will be restored in task scheduler.
              */
             freeUsed(defaultContext);
         }
@@ -148,36 +139,24 @@ public class DefaultTopologyScheduler implements IToplogyScheduler {
 
         Set<Integer> needAssignTasks = getNeedAssignTasks(defaultContext);
 
-        Set<ResourceWorkerSlot> keepAssigns =
-                getKeepAssign(defaultContext, needAssignTasks);
+        Set<ResourceWorkerSlot> keepAssigns = getKeepAssign(defaultContext, needAssignTasks);
 
         // please use tree map to make task sequence
         Set<ResourceWorkerSlot> ret = new HashSet<ResourceWorkerSlot>();
         ret.addAll(keepAssigns);
         ret.addAll(defaultContext.getUnstoppedWorkers());
 
-        int allocWorkerNum =
-                defaultContext.getTotalWorkerNum()
-                        - defaultContext.getUnstoppedWorkerNum()
-                        - keepAssigns.size();
-        LOG.info("allocWorkerNum=" + allocWorkerNum + ", totalWorkerNum="
-                + defaultContext.getTotalWorkerNum());
+        int allocWorkerNum = defaultContext.getTotalWorkerNum() - defaultContext.getUnstoppedWorkerNum() - keepAssigns.size();
+        LOG.info("allocWorkerNum=" + allocWorkerNum + ", totalWorkerNum=" + defaultContext.getTotalWorkerNum() + ", keepWorkerNum=" + keepAssigns.size());
 
         if (allocWorkerNum <= 0) {
-            LOG.warn("Don't need assign workers, all workers are fine "
-                    + defaultContext.toDetailString());
-            throw new FailedAssignTopologyException(
-                    "Don't need assign worker, all workers are fine ");
+            LOG.warn("Don't need assign workers, all workers are fine " + defaultContext.toDetailString());
+            throw new FailedAssignTopologyException("Don't need assign worker, all workers are fine ");
         }
 
-        List<ResourceWorkerSlot> availableWorkers =
-                WorkerScheduler.getInstance().getAvailableWorkers(
-                        defaultContext, needAssignTasks, allocWorkerNum);
-        TaskScheduler taskScheduler =
-                new TaskScheduler(defaultContext, needAssignTasks,
-                        availableWorkers);
-        Set<ResourceWorkerSlot> assignment =
-                new HashSet<ResourceWorkerSlot>(taskScheduler.assign());
+        List<ResourceWorkerSlot> availableWorkers = WorkerScheduler.getInstance().getAvailableWorkers(defaultContext, needAssignTasks, allocWorkerNum);
+        TaskScheduler taskScheduler = new TaskScheduler(defaultContext, needAssignTasks, availableWorkers);
+        Set<ResourceWorkerSlot> assignment = new HashSet<ResourceWorkerSlot>(taskScheduler.assign());
         ret.addAll(assignment);
 
         LOG.info("Keep Alive slots:" + keepAssigns);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/ResourceWorkerSlot.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/ResourceWorkerSlot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/ResourceWorkerSlot.java
index c218f52..df8eba5 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/ResourceWorkerSlot.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/ResourceWorkerSlot.java
@@ -39,8 +39,7 @@ import com.alibaba.jstorm.utils.NetWorkUtils;
 //one worker 's assignment
 public class ResourceWorkerSlot extends WorkerSlot implements Serializable {
 
-    public static Logger LOG = LoggerFactory
-            .getLogger(ResourceWorkerSlot.class);
+    public static Logger LOG = LoggerFactory.getLogger(ResourceWorkerSlot.class);
     private static final long serialVersionUID = 9138386287559932411L;
 
     private String hostname;
@@ -58,16 +57,14 @@ public class ResourceWorkerSlot extends WorkerSlot implements Serializable {
         super(supervisorId, port);
     }
 
-    public ResourceWorkerSlot(WorkerAssignment worker,
-            Map<String, List<Integer>> componentToTask) {
+    public ResourceWorkerSlot(WorkerAssignment worker, Map<String, List<Integer>> componentToTask) {
         super(worker.getNodeId(), worker.getPort());
         this.hostname = worker.getHostName();
         this.tasks = new HashSet<Integer>();
         this.cpu = worker.getCpu();
         this.memSize = worker.getMem();
         this.jvm = worker.getJvm();
-        for (Entry<String, Integer> entry : worker.getComponentToNum()
-                .entrySet()) {
+        for (Entry<String, Integer> entry : worker.getComponentToNum().entrySet()) {
             List<Integer> tasks = componentToTask.get(entry.getKey());
             if (tasks == null || tasks.size() == 0)
                 continue;
@@ -121,12 +118,10 @@ public class ResourceWorkerSlot extends WorkerSlot implements Serializable {
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
 
-    public boolean compareToUserDefineWorker(WorkerAssignment worker,
-            Map<Integer, String> taskToComponent) {
+    public boolean compareToUserDefineWorker(WorkerAssignment worker, Map<Integer, String> taskToComponent) {
         int cpu = worker.getCpu();
         if (cpu != 0 && this.cpu != cpu)
             return false;

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/AbstractSelector.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/AbstractSelector.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/AbstractSelector.java
index eed0e39..1130b1a 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/AbstractSelector.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/AbstractSelector.java
@@ -36,8 +36,7 @@ public abstract class AbstractSelector implements Selector {
         this.context = context;
     }
 
-    protected List<ResourceWorkerSlot> selectWorker(
-            List<ResourceWorkerSlot> list, Comparator<ResourceWorkerSlot> c) {
+    protected List<ResourceWorkerSlot> selectWorker(List<ResourceWorkerSlot> list, Comparator<ResourceWorkerSlot> c) {
         List<ResourceWorkerSlot> result = new ArrayList<ResourceWorkerSlot>();
         ResourceWorkerSlot best = null;
         for (ResourceWorkerSlot worker : list) {
@@ -58,8 +57,7 @@ public abstract class AbstractSelector implements Selector {
     }
 
     @Override
-    public List<ResourceWorkerSlot> select(List<ResourceWorkerSlot> result,
-            String name) {
+    public List<ResourceWorkerSlot> select(List<ResourceWorkerSlot> result, String name) {
         if (result.size() == 1)
             return result;
         result = this.selectWorker(result, workerComparator.get(name));