You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:45 UTC
[07/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/SimpleJStormMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/SimpleJStormMetric.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/SimpleJStormMetric.java
index 1d77089..7db0ed4 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/SimpleJStormMetric.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/SimpleJStormMetric.java
@@ -17,80 +17,99 @@
*/
package com.alibaba.jstorm.metric;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.jstorm.common.metric.Histogram;
-import com.alibaba.jstorm.common.metric.MetricRegistry;
-import com.alibaba.jstorm.common.metric.window.Metric;
-
-public class SimpleJStormMetric extends JStormMetrics implements Runnable{
- private static final Logger LOG = LoggerFactory.getLogger(SimpleJStormMetric.class);
-
- protected static MetricRegistry metrics = JStormMetrics.workerMetrics;
- static {
- Metric.setEnable(true);
+import com.alibaba.jstorm.common.metric.*;
+import com.codahale.metrics.Gauge;
+
+/**
+ * simplified metrics, only for worker metrics. all metrics are logged locally without reporting to TM or nimbus.
+ *
+ * @author Cody (weiyue.wy@alibaba-inc.com)
+ * @since 2.0.5
+ */
+public class SimpleJStormMetric extends JStormMetrics {
+ private static final long serialVersionUID = 7468005641982249536L;
+
+ protected static final AsmMetricRegistry metrics = JStormMetrics.getWorkerMetrics();
+
+ public static void updateNimbusHistogram(String name, Number obj) {
+ updateHistogram(NIMBUS_METRIC_KEY, name, obj);
}
-
- protected static SimpleJStormMetric instance = null;
-
-
- public static SimpleJStormMetric mkInstance() {
- synchronized (SimpleJStormMetric.class) {
- if (instance == null) {
- instance = new SimpleJStormMetric();
- }
-
- return instance;
- }
+
+ public static void updateSupervisorHistogram(String name, Number obj) {
+ updateHistogram(SUPERVISOR_METRIC_KEY, name, obj);
}
-
- protected SimpleJStormMetric() {
-
+
+ public static void updateNimbusMeter(String name, Number obj) {
+ updateMeter(NIMBUS_METRIC_KEY, name, obj);
+ }
+
+ public static void updateSupervisorMeter(String name, Number obj) {
+ updateMeter(SUPERVISOR_METRIC_KEY, name, obj);
+ }
+
+ public static void updateNimbusCounter(String name, Number obj) {
+ updateCounter(NIMBUS_METRIC_KEY, name, obj);
}
-
- public static Histogram registerHistorgram(String name) {
- return JStormMetrics.registerWorkerHistogram(name);
+
+ public static void updateSupervisorCounter(String name, Number obj) {
+ updateCounter(SUPERVISOR_METRIC_KEY, name, obj);
}
-
- public static void updateHistorgram(String name, Number obj) {
- LOG.debug(name + ":" + obj.doubleValue());
- Histogram histogram = (Histogram)metrics.getMetric(name);
+
+ public static void updateHistogram(String key, String name, Number obj) {
+ String formalName = MetricUtils.workerMetricName(key, host, 0, name, MetricType.HISTOGRAM);
+ AsmHistogram histogram = (AsmHistogram) metrics.getMetric(formalName);
if (histogram == null) {
- try {
- histogram = registerHistorgram(name);
- }catch(Exception e) {
- LOG.info("{} has been register", name);
- return;
- }
+ histogram = registerHistogram(name);
}
-
+
histogram.update(obj);
-
}
- @Override
- public void run() {
- // TODO Auto-generated method stub
- Map<String, Metric> map = metrics.getMetrics();
-
- for (Entry<String, Metric> entry : map.entrySet()) {
- String key = entry.getKey();
- Metric metric = entry.getValue();
-
- LOG.info(key + ":" + metric.getSnapshot());
+ public static void updateMeter(String key, String name, Number obj) {
+ String formalName = MetricUtils.workerMetricName(key, host, 0, name, MetricType.METER);
+ AsmMeter meter = (AsmMeter) metrics.getMetric(formalName);
+ if (meter == null) {
+ meter = registerMeter(name);
}
+
+ meter.update(obj);
+ }
+
+ public static void updateCounter(String key, String name, Number obj) {
+ String formalName = MetricUtils.workerMetricName(key, host, 0, name, MetricType.COUNTER);
+ AsmCounter counter = (AsmCounter) metrics.getMetric(formalName);
+ if (counter == null) {
+ counter = registerCounter(name);
+ }
+
+ counter.update(obj);
+ }
+
+ private static AsmGauge registerGauge(Gauge<Double> gauge, String name) {
+ AsmGauge gauge1 = new AsmGauge(gauge);
+ gauge1.setOp(AsmMetric.MetricOp.LOG);
+
+ return registerWorkerGauge(topologyId, name, gauge1);
}
-
-
- public static void main(String[] args) {
- updateHistorgram("test", 11100.0);
-
- SimpleJStormMetric instance = new SimpleJStormMetric();
-
- instance.run();
+
+ private static AsmHistogram registerHistogram(String name) {
+ AsmHistogram histogram = new AsmHistogram();
+ histogram.setOp(AsmMetric.MetricOp.LOG);
+
+ return registerWorkerHistogram(NIMBUS_METRIC_KEY, name, histogram);
+ }
+
+ public static AsmMeter registerMeter(String name) {
+ AsmMeter meter = new AsmMeter();
+ meter.setOp(AsmMetric.MetricOp.LOG);
+
+ return registerWorkerMeter(NIMBUS_METRIC_KEY, name, meter);
+ }
+
+ public static AsmCounter registerCounter(String name) {
+ AsmCounter counter = new AsmCounter();
+ counter.setOp(AsmMetric.MetricOp.LOG);
+
+ return registerWorkerCounter(NIMBUS_METRIC_KEY, name, counter);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TimeTicker.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TimeTicker.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TimeTicker.java
new file mode 100644
index 0000000..e44e7b5
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TimeTicker.java
@@ -0,0 +1,52 @@
+package com.alibaba.jstorm.metric;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * a simple util class to calculate run time
+ *
+ * @author Cody (weiyue.wy@alibaba-inc.com)
+ * @since 2.0.5
+ */
+public class TimeTicker {
+ private TimeUnit unit;
+ private long start;
+ private long end;
+
+ public TimeTicker(TimeUnit unit) {
+ if (unit != TimeUnit.NANOSECONDS && unit != TimeUnit.MILLISECONDS) {
+ throw new IllegalArgumentException("invalid unit!");
+ }
+ this.unit = unit;
+ }
+
+ public TimeTicker(TimeUnit unit, boolean start) {
+ this(unit);
+ if (start) {
+ start();
+ }
+ }
+
+ public void start() {
+ if (unit == TimeUnit.MILLISECONDS) {
+ this.start = System.currentTimeMillis();
+ } else if (unit == TimeUnit.NANOSECONDS) {
+ this.start = System.nanoTime();
+ }
+ }
+
+ public long stop() {
+ if (unit == TimeUnit.MILLISECONDS) {
+ this.end = System.currentTimeMillis();
+ } else if (unit == TimeUnit.NANOSECONDS) {
+ this.end = System.nanoTime();
+ }
+ return end - start;
+ }
+
+ public long stopAndRestart() {
+ long elapsed = stop();
+ start();
+ return elapsed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TopologyMetricContext.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TopologyMetricContext.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TopologyMetricContext.java
new file mode 100644
index 0000000..8dae281
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/TopologyMetricContext.java
@@ -0,0 +1,528 @@
+package com.alibaba.jstorm.metric;
+
+import backtype.storm.generated.MetricInfo;
+import backtype.storm.generated.MetricSnapshot;
+import backtype.storm.generated.TopologyMetric;
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import org.apache.commons.lang.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A topology metric context contains all in-memory metric data of a topology.
+ * This class resides in TopologyMaster.
+ *
+ * @author Cody (weiyue.wy@alibaba-inc.com)
+ * @since 2.0.5
+ */
+public class TopologyMetricContext {
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private Set<ResourceWorkerSlot> workerSet;
+ private int taskNum = 1;
+ private ConcurrentMap<String, MetricInfo> memCache = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Long> memMeta = new ConcurrentHashMap<>();
+ private final AtomicBoolean isMerging = new AtomicBoolean(false);
+ private String topologyId;
+ private volatile int flushedMetaNum = 0;
+
+ /**
+ * sync meta from metric cache on startup
+ */
+ private volatile boolean syncMeta = false;
+
+ private Map conf;
+
+ public TopologyMetricContext() {
+ }
+
+ public TopologyMetricContext(Set<ResourceWorkerSlot> workerSet) {
+ this.workerSet = workerSet;
+ }
+
+ public TopologyMetricContext(String topologyId, Set<ResourceWorkerSlot> workerSet, Map conf) {
+ this(workerSet);
+ this.topologyId = topologyId;
+ this.conf = conf;
+ }
+
+ public ConcurrentMap<String, Long> getMemMeta() {
+ return memMeta;
+ }
+
+ public String getTopologyId() {
+ return topologyId;
+ }
+
+ public void setTopologyId(String topologyId) {
+ this.topologyId = topologyId;
+ }
+
+ public boolean syncMeta() {
+ return syncMeta;
+ }
+
+ public void setSyncMeta(boolean syncMeta) {
+ this.syncMeta = syncMeta;
+ }
+
+ public int getTaskNum() {
+ return taskNum;
+ }
+
+ public void setTaskNum(int taskNum) {
+ this.taskNum = taskNum;
+ }
+
+ public int getFlushedMetaNum() {
+ return flushedMetaNum;
+ }
+
+ public void setFlushedMetaNum(int flushedMetaNum) {
+ this.flushedMetaNum = flushedMetaNum;
+ }
+
+ public ReentrantLock getLock() {
+ return lock;
+ }
+
+ public int getWorkerNum() {
+ return workerSet.size();
+ }
+
+ public void setWorkerSet(Set<ResourceWorkerSlot> workerSet) {
+ this.workerSet = workerSet;
+ }
+
+ public void resetUploadedMetrics() {
+ this.memCache.clear();
+ }
+
+ public final ConcurrentMap<String, MetricInfo> getMemCache() {
+ return memCache;
+ }
+
+ public void addToMemCache(String workerSlot, MetricInfo metricInfo) {
+ memCache.put(workerSlot, metricInfo);
+ LOG.info("update mem cache, worker:{}, total uploaded:{}", workerSlot, memCache.size());
+ }
+
+ public boolean readyToUpload() {
+ return memCache.size() >= workerSet.size();
+ }
+
+ public boolean isMerging() {
+ return isMerging.get();
+ }
+
+ public void setMerging(boolean isMerging) {
+ this.isMerging.set(isMerging);
+ }
+
+ public int getUploadedWorkerNum() {
+ return memCache.size();
+ }
+
+ public TopologyMetric mergeMetrics() {
+ long start = System.currentTimeMillis();
+
+ if (getMemCache().size() == 0) {
+ //LOG.info("topology:{}, metric size is 0, skip...", topologyId);
+ return null;
+ }
+ if (isMerging()) {
+ LOG.info("topology {} is already merging, skip...", topologyId);
+ return null;
+ }
+
+ setMerging(true);
+
+ try {
+ Map<String, MetricInfo> workerMetricMap = this.memCache;
+ // reset mem cache
+ this.memCache = new ConcurrentHashMap<>();
+
+ MetricInfo topologyMetrics = MetricUtils.mkMetricInfo();
+ MetricInfo componentMetrics = MetricUtils.mkMetricInfo();
+ MetricInfo taskMetrics = MetricUtils.mkMetricInfo();
+ MetricInfo streamMetrics = MetricUtils.mkMetricInfo();
+ MetricInfo workerMetrics = MetricUtils.mkMetricInfo();
+ MetricInfo nettyMetrics = MetricUtils.mkMetricInfo();
+ TopologyMetric tpMetric =
+ new TopologyMetric(topologyMetrics, componentMetrics, workerMetrics, taskMetrics, streamMetrics, nettyMetrics);
+
+
+ // metric name => worker count
+ Map<String, Integer> metricNameCounters = new HashMap<>();
+
+ // special for histograms & timers, we merge the points to get a new snapshot data.
+ Map<String, Map<Integer, Histogram>> histograms = new HashMap<>();
+ Map<String, Map<Integer, Timer>> timers = new HashMap<>();
+
+ // iterate metrics of all workers within the same topology
+ for (ConcurrentMap.Entry<String, MetricInfo> metricEntry : workerMetricMap.entrySet()) {
+ MetricInfo metricInfo = metricEntry.getValue();
+
+ // merge counters: add old and new values, note we only add incoming new metrics and overwrite
+ // existing data, same for all below.
+ Map<String, Map<Integer, MetricSnapshot>> metrics = metricInfo.get_metrics();
+ for (Map.Entry<String, Map<Integer, MetricSnapshot>> metric : metrics.entrySet()) {
+ String metricName = metric.getKey();
+ Map<Integer, MetricSnapshot> data = metric.getValue();
+ MetaType metaType = MetricUtils.metaType(metricName);
+
+ MetricType metricType = MetricUtils.metricType(metricName);
+ if (metricType == MetricType.COUNTER) {
+ mergeCounters(tpMetric, metaType, metricName, data);
+ } else if (metricType == MetricType.GAUGE) {
+ mergeGauges(tpMetric, metaType, metricName, data);
+ } else if (metricType == MetricType.METER) {
+ mergeMeters(getMetricInfoByType(tpMetric, metaType), metricName, data, metricNameCounters);
+ } else if (metricType == MetricType.HISTOGRAM) {
+ mergeHistograms(getMetricInfoByType(tpMetric, metaType),
+ metricName, data, metricNameCounters, histograms);
+ } else if (metricType == MetricType.TIMER) {
+ mergeTimers(getMetricInfoByType(tpMetric, metaType),
+ metricName, data, metricNameCounters, timers);
+ }
+ }
+ }
+ adjustHistogramTimerMetrics(tpMetric, metricNameCounters, histograms, timers);
+ // for counters, we only report delta data every time, need to sum with old data
+ //adjustCounterMetrics(tpMetric, oldTpMetric);
+
+ LOG.info("merge topology metrics:{}, cost:{}", topologyId, System.currentTimeMillis() - start);
+ // debug logs
+ //MetricUtils.printMetricWinSize(componentMetrics);
+
+ return tpMetric;
+ } finally {
+ setMerging(false);
+ }
+ }
+
+
+ protected MetricInfo getMetricInfoByType(TopologyMetric topologyMetric, MetaType type) {
+ if (type == MetaType.TASK) {
+ return topologyMetric.get_taskMetric();
+ } else if (type == MetaType.WORKER) {
+ return topologyMetric.get_workerMetric();
+ } else if (type == MetaType.COMPONENT) {
+ return topologyMetric.get_componentMetric();
+ } else if (type == MetaType.STREAM) {
+ return topologyMetric.get_streamMetric();
+ } else if (type == MetaType.NETTY) {
+ return topologyMetric.get_nettyMetric();
+ } else if (type == MetaType.TOPOLOGY) {
+ return topologyMetric.get_topologyMetric();
+ }
+ return null;
+ }
+
+ public void mergeCounters(TopologyMetric tpMetric, MetaType metaType, String meta,
+ Map<Integer, MetricSnapshot> data) {
+ MetricInfo metricInfo = getMetricInfoByType(tpMetric, metaType);
+ Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta);
+ if (existing == null) {
+ metricInfo.put_to_metrics(meta, data);
+ } else {
+ for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
+ Integer win = dataEntry.getKey();
+ MetricSnapshot snapshot = dataEntry.getValue();
+ MetricSnapshot old = existing.get(win);
+ if (old == null) {
+ existing.put(win, snapshot);
+ } else {
+ old.set_ts(snapshot.get_ts());
+ old.set_longValue(old.get_longValue() + snapshot.get_longValue());
+ }
+ }
+ }
+ }
+
+ public void mergeGauges(TopologyMetric tpMetric, MetaType metaType, String meta,
+ Map<Integer, MetricSnapshot> data) {
+ MetricInfo metricInfo = getMetricInfoByType(tpMetric, metaType);
+ Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta);
+ if (existing == null) {
+ metricInfo.put_to_metrics(meta, data);
+ } else {
+ for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
+ Integer win = dataEntry.getKey();
+ MetricSnapshot snapshot = dataEntry.getValue();
+ MetricSnapshot old = existing.get(win);
+ if (old == null) {
+ existing.put(win, snapshot);
+ } else {
+ if (snapshot.get_ts() >= old.get_ts()) {
+ old.set_ts(snapshot.get_ts());
+ if (metaType != MetaType.TOPOLOGY) {
+ old.set_doubleValue(snapshot.get_doubleValue());
+ } else { // for topology metric, gauge might be add-able, e.g., cpu, memory, etc.
+ old.set_doubleValue(old.get_doubleValue() + snapshot.get_doubleValue());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * meters are not sampled.
+ */
+ public void mergeMeters(MetricInfo metricInfo, String meta, Map<Integer, MetricSnapshot> data,
+ Map<String, Integer> metaCounters) {
+ Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta);
+ if (existing == null) {
+ metricInfo.put_to_metrics(meta, data);
+ } else {
+ for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
+ Integer win = dataEntry.getKey();
+ MetricSnapshot snapshot = dataEntry.getValue();
+ MetricSnapshot old = existing.get(win);
+ if (old == null) {
+ existing.put(win, snapshot);
+ } else {
+ if (snapshot.get_ts() >= old.get_ts()) {
+ old.set_ts(snapshot.get_ts());
+ old.set_mean(old.get_mean() + snapshot.get_mean());
+ old.set_m1(old.get_m1() + snapshot.get_m1());
+ old.set_m5(old.get_m5() + snapshot.get_m5());
+ old.set_m15(old.get_m15() + snapshot.get_m15());
+ }
+ }
+ }
+ }
+ updateMetricCounters(meta, metaCounters);
+ }
+
+ /**
+ * histograms are sampled, but we just update points
+ */
+ public void mergeHistograms(MetricInfo metricInfo, String meta, Map<Integer, MetricSnapshot> data,
+ Map<String, Integer> metaCounters, Map<String, Map<Integer, Histogram>> histograms) {
+ Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta);
+ if (existing == null) {
+ metricInfo.put_to_metrics(meta, data);
+ Map<Integer, Histogram> histogramMap = new HashMap<>();
+ for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
+ Histogram histogram = MetricUtils.metricSnapshot2Histogram(dataEntry.getValue());
+ histogramMap.put(dataEntry.getKey(), histogram);
+ }
+ histograms.put(meta, histogramMap);
+ } else {
+ for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
+ Integer win = dataEntry.getKey();
+ MetricSnapshot snapshot = dataEntry.getValue();
+ MetricSnapshot old = existing.get(win);
+ if (old == null) {
+ existing.put(win, snapshot);
+ histograms.get(meta).put(win, MetricUtils.metricSnapshot2Histogram(snapshot));
+ } else {
+ if (snapshot.get_ts() >= old.get_ts()) {
+ old.set_ts(snapshot.get_ts());
+ // update points
+ MetricUtils.updateHistogramPoints(histograms.get(meta).get(win), snapshot.get_points());
+ }
+ }
+ }
+ }
+ updateMetricCounters(meta, metaCounters);
+ }
+
+ /**
+ * timers are sampled, we just update points
+ */
+ public void mergeTimers(MetricInfo metricInfo, String meta, Map<Integer, MetricSnapshot> data,
+ Map<String, Integer> metaCounters, Map<String, Map<Integer, Timer>> timers) {
+ Map<Integer, MetricSnapshot> existing = metricInfo.get_metrics().get(meta);
+ if (existing == null) {
+ metricInfo.put_to_metrics(meta, data);
+ Map<Integer, Timer> timerMap = new HashMap<>();
+ for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
+ Timer timer = MetricUtils.metricSnapshot2Timer(dataEntry.getValue());
+ timerMap.put(dataEntry.getKey(), timer);
+ }
+ timers.put(meta, timerMap);
+ } else {
+ for (Map.Entry<Integer, MetricSnapshot> dataEntry : data.entrySet()) {
+ Integer win = dataEntry.getKey();
+ MetricSnapshot snapshot = dataEntry.getValue();
+ MetricSnapshot old = existing.get(win);
+ if (old == null) {
+ existing.put(win, snapshot);
+ timers.get(meta).put(win, MetricUtils.metricSnapshot2Timer(snapshot));
+ } else {
+ if (snapshot.get_ts() >= old.get_ts()) {
+ old.set_ts(snapshot.get_ts());
+ old.set_m1(old.get_m1() + snapshot.get_m1());
+ old.set_m5(old.get_m5() + snapshot.get_m5());
+ old.set_m15(old.get_m15() + snapshot.get_m15());
+
+ // update points
+ MetricUtils.updateTimerPoints(timers.get(meta).get(win), snapshot.get_points());
+ }
+ }
+ }
+ }
+ updateMetricCounters(meta, metaCounters);
+ }
+
+ /**
+ * computes occurrences of specified metric name
+ */
+ protected void updateMetricCounters(String metricName, Map<String, Integer> metricNameCounters) {
+ if (metricNameCounters.containsKey(metricName)) {
+ metricNameCounters.put(metricName, metricNameCounters.get(metricName) + 1);
+ } else {
+ metricNameCounters.put(metricName, 1);
+ }
+ }
+
+ protected void adjustHistogramTimerMetrics(TopologyMetric tpMetric, Map<String, Integer> metaCounters,
+ Map<String, Map<Integer, Histogram>> histograms,
+ Map<String, Map<Integer, Timer>> timers) {
+ resetPoints(tpMetric.get_taskMetric().get_metrics());
+ resetPoints(tpMetric.get_streamMetric().get_metrics());
+ resetPoints(tpMetric.get_nettyMetric().get_metrics());
+ resetPoints(tpMetric.get_workerMetric().get_metrics());
+
+ Map<String, Map<Integer, MetricSnapshot>> compMetrics =
+ tpMetric.get_componentMetric().get_metrics();
+ Map<String, Map<Integer, MetricSnapshot>> topologyMetrics =
+ tpMetric.get_topologyMetric().get_metrics();
+
+ adjustMetrics(compMetrics, metaCounters, histograms, timers);
+ adjustMetrics(topologyMetrics, metaCounters, histograms, timers);
+ }
+
+ private void adjustMetrics(Map<String, Map<Integer, MetricSnapshot>> metrics, Map<String, Integer> metaCounters,
+ Map<String, Map<Integer, Histogram>> histograms, Map<String, Map<Integer, Timer>> timers) {
+ for (Map.Entry<String, Map<Integer, MetricSnapshot>> metricEntry : metrics.entrySet()) {
+ String meta = metricEntry.getKey();
+ MetricType metricType = MetricUtils.metricType(meta);
+ MetaType metaType = MetricUtils.metaType(meta);
+ Map<Integer, MetricSnapshot> winData = metricEntry.getValue();
+
+ if (metricType == MetricType.HISTOGRAM) {
+ for (Map.Entry<Integer, MetricSnapshot> dataEntry : winData.entrySet()) {
+ MetricSnapshot snapshot = dataEntry.getValue();
+ Integer cnt = metaCounters.get(meta);
+ Histogram histogram = histograms.get(meta).get(dataEntry.getKey());
+ if (cnt != null && cnt > 1) {
+
+ Snapshot snapshot1 = histogram.getSnapshot();
+ snapshot.set_mean(snapshot1.getMean());
+ snapshot.set_p50(snapshot1.getMedian());
+ snapshot.set_p75(snapshot1.get75thPercentile());
+ snapshot.set_p95(snapshot1.get95thPercentile());
+ snapshot.set_p98(snapshot1.get98thPercentile());
+ snapshot.set_p99(snapshot1.get99thPercentile());
+ snapshot.set_p999(snapshot1.get999thPercentile());
+ snapshot.set_stddev(snapshot1.getStdDev());
+ snapshot.set_min(snapshot1.getMin());
+ snapshot.set_max(snapshot1.getMax());
+
+ if (metaType == MetaType.TOPOLOGY) {
+ snapshot.set_points(Arrays.asList(ArrayUtils.toObject(snapshot1.getValues())));
+ }
+ }
+ if (metaType != MetaType.TOPOLOGY) {
+ snapshot.set_points(new ArrayList<Long>(0));
+ }
+ }
+
+ } else if (metricType == MetricType.TIMER) {
+ for (Map.Entry<Integer, MetricSnapshot> dataEntry : winData.entrySet()) {
+ MetricSnapshot snapshot = dataEntry.getValue();
+ Integer cnt = metaCounters.get(meta);
+ if (cnt != null && cnt > 1) {
+ Timer timer = timers.get(meta).get(dataEntry.getKey());
+ Snapshot snapshot1 = timer.getSnapshot();
+ snapshot.set_p50(snapshot1.getMedian());
+ snapshot.set_p75(snapshot1.get75thPercentile());
+ snapshot.set_p95(snapshot1.get95thPercentile());
+ snapshot.set_p98(snapshot1.get98thPercentile());
+ snapshot.set_p99(snapshot1.get99thPercentile());
+ snapshot.set_p999(snapshot1.get999thPercentile());
+ snapshot.set_stddev(snapshot1.getStdDev());
+ snapshot.set_min(snapshot1.getMin());
+ snapshot.set_max(snapshot1.getMax());
+ }
+ snapshot.set_points(new ArrayList<Long>(0));
+ }
+ }
+ }
+ }
+
+ private void resetPoints(Map<String, Map<Integer, MetricSnapshot>> metrics) {
+ for (Map.Entry<String, Map<Integer, MetricSnapshot>> metricEntry : metrics.entrySet()) {
+ String meta = metricEntry.getKey();
+ MetricType metricType = MetricUtils.metricType(meta);
+ Map<Integer, MetricSnapshot> winData = metricEntry.getValue();
+
+ if (metricType == MetricType.HISTOGRAM || metricType == MetricType.TIMER) {
+ for (MetricSnapshot snapshot : winData.values()) {
+ snapshot.set_points(new ArrayList<Long>(0));
+ }
+ }
+ }
+ }
+
+ protected void adjustCounterMetrics(TopologyMetric tpMetric, TopologyMetric oldMetric) {
+ if (oldMetric != null) {
+ mergeCounters(tpMetric.get_streamMetric().get_metrics(),
+ oldMetric.get_streamMetric().get_metrics());
+
+ mergeCounters(tpMetric.get_taskMetric().get_metrics(),
+ oldMetric.get_taskMetric().get_metrics());
+
+ mergeCounters(tpMetric.get_componentMetric().get_metrics(),
+ oldMetric.get_componentMetric().get_metrics());
+
+ mergeCounters(tpMetric.get_workerMetric().get_metrics(),
+ oldMetric.get_workerMetric().get_metrics());
+
+ mergeCounters(tpMetric.get_nettyMetric().get_metrics(),
+ oldMetric.get_nettyMetric().get_metrics());
+ }
+ }
+
+ /**
+ * sum old counter snapshots and new counter snapshots, sums are stored in new snapshots.
+ */
+ private void mergeCounters(Map<String, Map<Integer, MetricSnapshot>> newCounters,
+ Map<String, Map<Integer, MetricSnapshot>> oldCounters) {
+ for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : newCounters.entrySet()) {
+ String metricName = entry.getKey();
+ Map<Integer, MetricSnapshot> snapshots = entry.getValue();
+ Map<Integer, MetricSnapshot> oldSnapshots = oldCounters.get(metricName);
+ if (oldSnapshots != null && oldSnapshots.size() > 0) {
+ for (Map.Entry<Integer, MetricSnapshot> snapshotEntry : snapshots.entrySet()) {
+ Integer win = snapshotEntry.getKey();
+ MetricSnapshot snapshot = snapshotEntry.getValue();
+ MetricSnapshot oldSnapshot = oldSnapshots.get(win);
+ if (oldSnapshot != null) {
+ snapshot.set_longValue(snapshot.get_longValue() + oldSnapshot.get_longValue());
+ }
+ }
+ }
+ }
+ }
+
+ private double getSampleRate() {
+ return ConfigExtension.getMetricSampleRate(conf);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEvent.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEvent.java b/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEvent.java
index eabcd44..71e7cac 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEvent.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEvent.java
@@ -31,11 +31,10 @@ public class JstormEvent {
this.msgId = msgId;
}
- public final static EventFactory<JstormEvent> EVENT_FACTORY =
- new EventFactory<JstormEvent>() {
- public JstormEvent newInstance() {
- return new JstormEvent();
- }
- };
+ public final static EventFactory<JstormEvent> EVENT_FACTORY = new EventFactory<JstormEvent>() {
+ public JstormEvent newInstance() {
+ return new JstormEvent();
+ }
+ };
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEventHandler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEventHandler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEventHandler.java
index dfc43d1..107441d 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEventHandler.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/queue/disruptor/JstormEventHandler.java
@@ -33,8 +33,7 @@ public class JstormEventHandler implements EventHandler {
}
@Override
- public void onEvent(Object event, long sequence, boolean endOfBatch)
- throws Exception {
+ public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
long msgId = Long.parseLong(((JstormEvent) event).getMsgId());
// if (msgId % size ==0) {
// logger.warn("consumer msgId=" + msgId + ", seq=" + sequence);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/Assignment.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/Assignment.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/Assignment.java
index 69620bd..8a05662 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/Assignment.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/Assignment.java
@@ -29,31 +29,28 @@ import org.apache.commons.lang.builder.ToStringStyle;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
/**
- * Assignment of one Toplogy, stored in /ZK-DIR/assignments/{topologyid}
- * nodeHost {supervisorid: hostname} -- assigned supervisor Map
- * taskStartTimeSecs: {taskid, taskStartSeconds} masterCodeDir: topology source
- * code's dir in Nimbus taskToResource: {taskid, ResourceAssignment}
+ * Assignment of one Toplogy, stored in /ZK-DIR/assignments/{topologyid} nodeHost {supervisorid: hostname} -- assigned supervisor Map taskStartTimeSecs:
+ * {taskid, taskStartSeconds} masterCodeDir: topology source code's dir in Nimbus taskToResource: {taskid, ResourceAssignment}
*
* @author Lixin/Longda
*/
public class Assignment implements Serializable {
public enum AssignmentType {
- Assign, Config
+ Assign, UpdateTopology, ScaleTopology
}
private static final long serialVersionUID = 6087667851333314069L;
private final String masterCodeDir;
/**
- * @@@ nodeHost store <supervisorId, hostname>, this will waste some zk
- * storage
+ * @@@ nodeHost store <supervisorId, hostname>, this will waste some zk storage
*/
private final Map<String, String> nodeHost;
private final Map<Integer, Integer> taskStartTimeSecs;
private final Set<ResourceWorkerSlot> workers;
private long timeStamp;
-
+
private AssignmentType type;
public Assignment() {
@@ -64,10 +61,8 @@ public class Assignment implements Serializable {
this.timeStamp = System.currentTimeMillis();
this.type = AssignmentType.Assign;
}
-
- public Assignment(String masterCodeDir, Set<ResourceWorkerSlot> workers,
- Map<String, String> nodeHost,
- Map<Integer, Integer> taskStartTimeSecs) {
+
+ public Assignment(String masterCodeDir, Set<ResourceWorkerSlot> workers, Map<String, String> nodeHost, Map<Integer, Integer> taskStartTimeSecs) {
this.workers = workers;
this.nodeHost = nodeHost;
this.taskStartTimeSecs = taskStartTimeSecs;
@@ -79,11 +74,11 @@ public class Assignment implements Serializable {
public void setAssignmentType(AssignmentType type) {
this.type = type;
}
-
+
public AssignmentType getAssignmentType() {
return type;
}
-
+
public Map<String, String> getNodeHost() {
return nodeHost;
}
@@ -106,11 +101,9 @@ public class Assignment implements Serializable {
* @param supervisorId
* @return Map<Integer, WorkerSlot>
*/
- public Map<Integer, ResourceWorkerSlot> getTaskToNodePortbyNode(
- String supervisorId) {
+ public Map<Integer, ResourceWorkerSlot> getTaskToNodePortbyNode(String supervisorId) {
- Map<Integer, ResourceWorkerSlot> result =
- new HashMap<Integer, ResourceWorkerSlot>();
+ Map<Integer, ResourceWorkerSlot> result = new HashMap<Integer, ResourceWorkerSlot>();
for (ResourceWorkerSlot worker : workers) {
if (worker.getNodeId().equals(supervisorId)) {
result.put(worker.getPort(), worker);
@@ -144,8 +137,7 @@ public class Assignment implements Serializable {
public Set<Integer> getCurrentWorkerTasks(String supervisorId, int port) {
for (ResourceWorkerSlot worker : workers) {
- if (worker.getNodeId().equals(supervisorId)
- && worker.getPort() == port)
+ if (worker.getNodeId().equals(supervisorId) && worker.getPort() == port)
return worker.getTasks();
}
@@ -164,26 +156,24 @@ public class Assignment implements Serializable {
return this.timeStamp;
}
+ public boolean isTopologyChange(long oldTimeStamp) {
+ boolean isChange = false;
+ if (timeStamp > oldTimeStamp && (type.equals(AssignmentType.UpdateTopology) || type.equals(AssignmentType.ScaleTopology)))
+ isChange = true;
+ return isChange;
+ }
+
public void updateTimeStamp() {
timeStamp = System.currentTimeMillis();
}
-
+
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
- result =
- prime
- * result
- + ((masterCodeDir == null) ? 0 : masterCodeDir
- .hashCode());
- result =
- prime * result + ((nodeHost == null) ? 0 : nodeHost.hashCode());
- result =
- prime
- * result
- + ((taskStartTimeSecs == null) ? 0 : taskStartTimeSecs
- .hashCode());
+ result = prime * result + ((masterCodeDir == null) ? 0 : masterCodeDir.hashCode());
+ result = prime * result + ((nodeHost == null) ? 0 : nodeHost.hashCode());
+ result = prime * result + ((taskStartTimeSecs == null) ? 0 : taskStartTimeSecs.hashCode());
result = prime * result + ((workers == null) ? 0 : workers.hashCode());
result = prime * result + (int) (timeStamp & 0xFFFFFFFF);
return result;
@@ -225,8 +215,7 @@ public class Assignment implements Serializable {
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/AssignmentBak.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/AssignmentBak.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/AssignmentBak.java
index 3e7a770..4b9bbb3 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/AssignmentBak.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/AssignmentBak.java
@@ -32,8 +32,7 @@ public class AssignmentBak implements Serializable {
private final Map<String, List<Integer>> componentTasks;
private final Assignment assignment;
- public AssignmentBak(Map<String, List<Integer>> componentTasks,
- Assignment assignment) {
+ public AssignmentBak(Map<String, List<Integer>> componentTasks, Assignment assignment) {
super();
this.componentTasks = componentTasks;
this.assignment = assignment;
@@ -49,7 +48,6 @@ public class AssignmentBak implements Serializable {
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/CleanRunnable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/CleanRunnable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/CleanRunnable.java
index d73adfd..77bb883 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/CleanRunnable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/CleanRunnable.java
@@ -62,8 +62,7 @@ public class CleanRunnable implements Runnable {
try {
f.delete();
} catch (Exception e) {
- log.error("Cleaning inbox ... error deleting:"
- + f.getName() + "," + e);
+ log.error("Cleaning inbox ... error deleting:" + f.getName() + "," + e);
}
} else {
clean(f);
@@ -72,8 +71,7 @@ public class CleanRunnable implements Runnable {
try {
f.delete();
} catch (Exception e) {
- log.error("Cleaning inbox ... error deleting:"
- + f.getName() + "," + e);
+ log.error("Cleaning inbox ... error deleting:" + f.getName() + "," + e);
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/DelayEventRunnable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/DelayEventRunnable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/DelayEventRunnable.java
index 14d38d8..a9683fd 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/DelayEventRunnable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/DelayEventRunnable.java
@@ -28,8 +28,7 @@ public class DelayEventRunnable implements Runnable {
private StatusType status;
private Object[] args;
- public DelayEventRunnable(NimbusData data, String topologyid,
- StatusType status, Object[] args) {
+ public DelayEventRunnable(NimbusData data, String topologyid, StatusType status, Object[] args) {
this.data = data;
this.topologyid = topologyid;
this.status = status;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/FollowerRunnable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/FollowerRunnable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/FollowerRunnable.java
index e62c61b..d87b65e 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/FollowerRunnable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/FollowerRunnable.java
@@ -18,21 +18,21 @@
package com.alibaba.jstorm.schedule;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.commons.io.FileExistsException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
@@ -44,10 +44,12 @@ import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.NetWorkUtils;
import com.alibaba.jstorm.utils.PathUtils;
+import backtype.storm.Config;
+import backtype.storm.utils.Utils;
+
public class FollowerRunnable implements Runnable {
- private static final Logger LOG = LoggerFactory
- .getLogger(FollowerRunnable.class);
+ private static final Logger LOG = LoggerFactory.getLogger(FollowerRunnable.class);
private NimbusData data;
@@ -59,50 +61,62 @@ public class FollowerRunnable implements Runnable {
private final String hostPort;
+ public static final String NIMBUS_DIFFER_COUNT_ZK = "nimbus.differ.count.zk";
+
+ public static final Integer SLAVE_NIMBUS_WAIT_TIME = 120;
+
@SuppressWarnings("unchecked")
public FollowerRunnable(final NimbusData data, int sleepTime) {
this.data = data;
this.sleepTime = sleepTime;
+
if (!ConfigExtension.isNimbusUseIp(data.getConf())) {
- this.hostPort =
- NetWorkUtils.hostname()
- + ":"
- + String.valueOf(Utils.getInt(data.getConf().get(
- Config.NIMBUS_THRIFT_PORT)));
+ this.hostPort = NetWorkUtils.hostname() + ":" + String.valueOf(Utils.getInt(data.getConf().get(Config.NIMBUS_THRIFT_PORT)));
} else {
- this.hostPort =
- NetWorkUtils.ip()
- + ":"
- + String.valueOf(Utils.getInt(data.getConf().get(
- Config.NIMBUS_THRIFT_PORT)));
+ this.hostPort = NetWorkUtils.ip() + ":" + String.valueOf(Utils.getInt(data.getConf().get(Config.NIMBUS_THRIFT_PORT)));
}
try {
+
String[] hostfigs = this.hostPort.split(":");
boolean isLocaliP = false;
- if(hostfigs.length > 0){
+ if (hostfigs.length > 0) {
isLocaliP = hostfigs[0].equals("127.0.0.1");
}
- if(isLocaliP){
+ if (isLocaliP) {
throw new Exception("the hostname which Nimbus get is localhost");
}
- }catch(Exception e1){
- LOG.error("get nimbus host error!", e1);
- throw new RuntimeException(e1);
- }
- try {
- this.tryToBeLeader(data.getConf());
} catch (Exception e1) {
- // TODO Auto-generated catch block
- LOG.error("try to be leader error.", e1);
+ LOG.error("get nimbus host error!", e1);
throw new RuntimeException(e1);
}
+
try {
- data.getStormClusterState().update_nimbus_slave(hostPort,
- data.uptime());
+ data.getStormClusterState().update_nimbus_slave(hostPort, data.uptime());
+ data.getStormClusterState().update_nimbus_detail(hostPort, null);
} catch (Exception e) {
LOG.error("register nimbus host fail!", e);
throw new RuntimeException();
}
+ try{
+ update_nimbus_detail();
+ }catch (Exception e){
+ LOG.error("register detail of nimbus fail!", e);
+ throw new RuntimeException();
+ }
+ try {
+ this.tryToBeLeader(data.getConf());
+ } catch (Exception e1) {
+ try {
+ data.getStormClusterState().unregister_nimbus_host(hostPort);
+ data.getStormClusterState().unregister_nimbus_detail(hostPort);
+ }catch (Exception e2){
+ LOG.info("due to task errors, so remove register nimbus infomation" );
+ }finally {
+ // TODO Auto-generated catch block
+ LOG.error("try to be leader error.", e1);
+ throw new RuntimeException(e1);
+ }
+ }
callback = new RunnableCallback() {
@Override
public void run() {
@@ -121,6 +135,8 @@ public class FollowerRunnable implements Runnable {
return true;
}
+ // Two nimbus running on the same node isn't allowed
+ // so just checks ip is enough here
String[] part = zkMaster.split(":");
return NetWorkUtils.equals(part[0], NetWorkUtils.ip());
}
@@ -143,20 +159,21 @@ public class FollowerRunnable implements Runnable {
if (data.isLeader() == true) {
if (isZkLeader == false) {
LOG.info("New ZK master is " + master);
- JStormUtils.halt_process(1,
- "Lose ZK master node, halt process");
+ JStormUtils.halt_process(1, "Lose ZK master node, halt process");
return;
}
}
if (isZkLeader == true) {
zkClusterState.unregister_nimbus_host(hostPort);
+ zkClusterState.unregister_nimbus_detail(hostPort);
data.setLeader(true);
continue;
}
check();
zkClusterState.update_nimbus_slave(hostPort, data.uptime());
+ update_nimbus_detail();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
continue;
@@ -178,14 +195,29 @@ public class FollowerRunnable implements Runnable {
StormClusterState clusterState = data.getStormClusterState();
try {
- String master_stormdist_root =
- StormConfig.masterStormdistRoot(data.getConf());
+ String master_stormdist_root = StormConfig.masterStormdistRoot(data.getConf());
- List<String> code_ids =
- PathUtils.read_dir_contents(master_stormdist_root);
+ List<String> code_ids = PathUtils.read_dir_contents(master_stormdist_root);
List<String> assignments_ids = clusterState.assignments(callback);
+ Map<String, Assignment> assignmentMap = new HashMap<String, Assignment>();
+ List<String> update_ids = new ArrayList<String>();
+ for (String id : assignments_ids) {
+ Assignment assignment = clusterState.assignment_info(id, null);
+ Long localCodeDownTS;
+ try {
+ Long tmp = StormConfig.read_nimbus_topology_timestamp(data.getConf(), id);
+ localCodeDownTS = (tmp == null ? 0L : tmp);
+ } catch (FileNotFoundException e) {
+ localCodeDownTS = 0L;
+ }
+ if (assignment != null && assignment.isTopologyChange(localCodeDownTS.longValue())) {
+ update_ids.add(id);
+ }
+ assignmentMap.put(id, assignment);
+ }
+
List<String> done_ids = new ArrayList<String>();
for (String id : code_ids) {
@@ -199,13 +231,15 @@ public class FollowerRunnable implements Runnable {
code_ids.remove(id);
}
+ //redownload topologyid which hava been updated;
+ assignments_ids.addAll(update_ids);
+
for (String topologyId : code_ids) {
deleteLocalTopology(topologyId);
}
for (String id : assignments_ids) {
- Assignment assignment = clusterState.assignment_info(id, null);
- downloadCodeFromMaster(assignment, id);
+ downloadCodeFromMaster(assignmentMap.get(id), id);
}
} catch (IOException e) {
// TODO Auto-generated catch block
@@ -219,8 +253,7 @@ public class FollowerRunnable implements Runnable {
}
private void deleteLocalTopology(String topologyId) throws IOException {
- String dir_to_delete =
- StormConfig.masterStormdistRoot(data.getConf(), topologyId);
+ String dir_to_delete = StormConfig.masterStormdistRoot(data.getConf(), topologyId);
try {
PathUtils.rmr(dir_to_delete);
LOG.info("delete:" + dir_to_delete + "successfully!");
@@ -230,47 +263,113 @@ public class FollowerRunnable implements Runnable {
}
}
- private void downloadCodeFromMaster(Assignment assignment, String topologyId)
- throws IOException, TException {
+ private void downloadCodeFromMaster(Assignment assignment, String topologyId) throws IOException, TException {
try {
- String localRoot =
- StormConfig.masterStormdistRoot(data.getConf(), topologyId);
- String tmpDir =
- StormConfig.masterInbox(data.getConf()) + "/"
- + UUID.randomUUID().toString();
+ String localRoot = StormConfig.masterStormdistRoot(data.getConf(), topologyId);
+ String tmpDir = StormConfig.masterInbox(data.getConf()) + "/" + UUID.randomUUID().toString();
String masterCodeDir = assignment.getMasterCodeDir();
- JStormServerUtils.downloadCodeFromMaster(data.getConf(), tmpDir,
- masterCodeDir, topologyId, false);
+ JStormServerUtils.downloadCodeFromMaster(data.getConf(), tmpDir, masterCodeDir, topologyId, false);
- FileUtils.moveDirectory(new File(tmpDir), new File(localRoot));
+ File srcDir = new File(tmpDir);
+ File destDir = new File(localRoot);
+ try {
+ FileUtils.moveDirectory(srcDir, destDir);
+ } catch (FileExistsException e) {
+ FileUtils.copyDirectory(srcDir, destDir);
+ FileUtils.deleteQuietly(srcDir);
+ }
+ // Update downloadCode timeStamp
+ StormConfig.write_nimbus_topology_timestamp(data.getConf(), topologyId, System.currentTimeMillis());
} catch (TException e) {
// TODO Auto-generated catch block
- LOG.error(e + " downloadStormCode failed " + "topologyId:"
- + topologyId + "masterCodeDir:"
- + assignment.getMasterCodeDir());
+ LOG.error(e + " downloadStormCode failed " + "topologyId:" + topologyId + "masterCodeDir:" + assignment.getMasterCodeDir());
throw e;
}
- LOG.info("Finished downloading code for topology id " + topologyId
- + " from " + assignment.getMasterCodeDir());
+ LOG.info("Finished downloading code for topology id " + topologyId + " from " + assignment.getMasterCodeDir());
}
private void tryToBeLeader(final Map conf) throws Exception {
- RunnableCallback masterCallback = new RunnableCallback() {
- @Override
- public void run() {
- try {
- tryToBeLeader(conf);
- } catch (Exception e) {
- LOG.error("To be master error", e);
- JStormUtils.halt_process(30,
- "Cant't to be master" + e.getMessage());
+ boolean allowed = check_nimbus_priority();
+
+ if (allowed){
+ RunnableCallback masterCallback = new RunnableCallback() {
+ @Override
+ public void run() {
+ try {
+ tryToBeLeader(conf);
+ } catch (Exception e) {
+ LOG.error("To be master error", e);
+ JStormUtils.halt_process(30, "Cant't to be master" + e.getMessage());
+ }
}
- }
- };
- data.getStormClusterState().try_to_be_leader(Cluster.MASTER_SUBTREE,
- hostPort, masterCallback);
+ };
+ LOG.info("This nimbus can be leader");
+ data.getStormClusterState().try_to_be_leader(Cluster.MASTER_SUBTREE, hostPort, masterCallback);
+ }else {
+ LOG.info("This nimbus can't be leader");
+ }
}
+ /**
+ * Compared with other nimbus ,get priority of this nimbus
+ *
+ * @throws Exception
+ */
+ private boolean check_nimbus_priority() throws Exception {
+
+ int gap = update_nimbus_detail();
+ if (gap == 0) {
+ return true;
+ }
+
+ int left = SLAVE_NIMBUS_WAIT_TIME;
+ while(left > 0) {
+ LOG.info( "After " + left + " seconds, nimbus will try to be Leader!");
+ Thread.sleep(10 * 1000);
+ left -= 10;
+ }
+
+ StormClusterState zkClusterState = data.getStormClusterState();
+
+ List<String> followers = zkClusterState.list_dirs(Cluster.NIMBUS_SLAVE_DETAIL_SUBTREE, false);
+ if (followers == null || followers.size() == 0) {
+ return false;
+ }
+ for (String follower : followers) {
+ if (follower != null && !follower.equals(hostPort)) {
+ Map bMap = zkClusterState.get_nimbus_detail(follower, false);
+ if (bMap != null){
+ Object object = bMap.get(NIMBUS_DIFFER_COUNT_ZK);
+ if (object != null && (JStormUtils.parseInt(object)) < gap){
+ LOG.info("Current node can't be leader, due to {} has higher priority", follower);
+ return false;
+ }
+ }
+ }
+ }
+
+
+
+ return true;
+ }
+ private int update_nimbus_detail() throws Exception {
+
+ //update count = count of zk's binary files - count of nimbus's binary files
+ StormClusterState zkClusterState = data.getStormClusterState();
+ String master_stormdist_root = StormConfig.masterStormdistRoot(data.getConf());
+ List<String> code_ids = PathUtils.read_dir_contents(master_stormdist_root);
+ List<String> assignments_ids = data.getStormClusterState().assignments(callback);
+ assignments_ids.removeAll(code_ids);
+
+ Map mtmp = zkClusterState.get_nimbus_detail(hostPort, false);
+ if (mtmp == null){
+ mtmp = new HashMap();
+ }
+ mtmp.put(NIMBUS_DIFFER_COUNT_ZK, assignments_ids.size());
+ zkClusterState.update_nimbus_detail(hostPort, mtmp);
+ LOG.debug("update nimbus's detail " + mtmp);
+ return assignments_ids.size();
+ }
/**
* Check whether current node is master or not
*
@@ -291,13 +390,11 @@ public class FollowerRunnable implements Runnable {
// current process own master
return;
}
- LOG.warn("Current Nimbus has start thrift, but fail to own zk master :"
- + zkHost);
+ LOG.warn("Current Nimbus has start thrift, but fail to own zk master :" + zkHost);
}
// current process doesn't own master
- String err =
- "Current Nimubs fail to own nimbus_master, should halt process";
+ String err = "Current Nimubs fail to own nimbus_master, should halt process";
LOG.error(err);
JStormUtils.halt_process(0, err);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/IToplogyScheduler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/IToplogyScheduler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/IToplogyScheduler.java
index a9d9b92..a6e6093 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/IToplogyScheduler.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/IToplogyScheduler.java
@@ -26,6 +26,5 @@ import com.alibaba.jstorm.utils.FailedAssignTopologyException;
public interface IToplogyScheduler {
void prepare(Map conf);
- Set<ResourceWorkerSlot> assignTasks(TopologyAssignContext contex)
- throws FailedAssignTopologyException;
+ Set<ResourceWorkerSlot> assignTasks(TopologyAssignContext contex) throws FailedAssignTopologyException;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/MonitorRunnable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/MonitorRunnable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/MonitorRunnable.java
index 8342b79..b7c8e89 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/MonitorRunnable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/MonitorRunnable.java
@@ -17,27 +17,28 @@
*/
package com.alibaba.jstorm.schedule;
-import java.util.Date;
-import java.util.List;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import backtype.storm.Config;
+import backtype.storm.generated.TaskHeartbeat;
+import backtype.storm.generated.TopologyTaskHbInfo;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.daemon.nimbus.NimbusData;
import com.alibaba.jstorm.daemon.nimbus.NimbusUtils;
import com.alibaba.jstorm.daemon.nimbus.StatusType;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
+import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable.TaskDeadEvent;
+
+import java.util.*;
/**
- *
- * Scan all task's heartbeat, if task isn't alive, DO
- * NimbusUtils.transition(monitor)
+ * Scan all task's heartbeat, if task isn't alive, DO NimbusUtils.transition(monitor)
*
* @author Longda
- *
*/
public class MonitorRunnable implements Runnable {
private static Logger LOG = LoggerFactory.getLogger(MonitorRunnable.class);
@@ -49,9 +50,7 @@ public class MonitorRunnable implements Runnable {
}
/**
- * @@@ Todo when one topology is being reassigned, the topology should be
- * skip check
- * @param data
+ * @@@ Todo when one topology is being reassigned, the topology should skip check
*/
@Override
public void run() {
@@ -80,44 +79,84 @@ public class MonitorRunnable implements Runnable {
LOG.info("Failed to get task ids of " + topologyid);
continue;
}
+ Assignment assignment = clusterState.assignment_info(topologyid, null);
+ Set<Integer> deadTasks = new HashSet<Integer>();
boolean needReassign = false;
for (Integer task : taskIds) {
- boolean isTaskDead =
- NimbusUtils.isTaskDead(data, topologyid, task);
+ boolean isTaskDead = NimbusUtils.isTaskDead(data, topologyid, task);
if (isTaskDead == true) {
- LOG.info("Found " + topologyid + ",taskid:" + task
- + " is dead");
-
- ResourceWorkerSlot resource = null;
- Assignment assignment =
- clusterState.assignment_info(topologyid, null);
- if (assignment != null)
- resource = assignment.getWorkerByTaskId(task);
- if (resource != null) {
- Date now = new Date();
- String nowStr = TimeFormat.getSecond(now);
- String errorInfo =
- "Task-" + task + " is dead on "
- + resource.getHostname() + ":"
- + resource.getPort() + ", "
- + nowStr;
- LOG.info(errorInfo);
- clusterState.report_task_error(topologyid, task,
- errorInfo);
- }
+ deadTasks.add(task);
needReassign = true;
- break;
}
}
+
+
+ TopologyTaskHbInfo topologyHbInfo = data.getTasksHeartbeat().get(topologyid);
if (needReassign == true) {
- NimbusUtils.transition(data, topologyid, false,
- StatusType.monitor);
+ if (topologyHbInfo != null) {
+ int topologyMasterId = topologyHbInfo.get_topologyMasterId();
+ if (deadTasks.contains(topologyMasterId)) {
+ deadTasks.clear();
+ if (assignment != null) {
+ ResourceWorkerSlot resource = assignment.getWorkerByTaskId(topologyMasterId);
+ if (resource != null)
+ deadTasks.addAll(resource.getTasks());
+ else
+ deadTasks.add(topologyMasterId);
+ }
+ } else {
+ Map<Integer, TaskHeartbeat> taskHbs = topologyHbInfo.get_taskHbs();
+ int launchTime = JStormUtils.parseInt(data.getConf().get(Config.NIMBUS_TASK_LAUNCH_SECS));
+ if (taskHbs == null || taskHbs.get(topologyMasterId) == null || taskHbs.get(topologyMasterId).get_uptime() < launchTime) {
+ /*try {
+ clusterState.topology_heartbeat(topologyid, topologyHbInfo);
+ } catch (Exception e) {
+ LOG.error("Failed to update task heartbeat info to ZK for " + topologyid, e);
+ }*/
+ return;
+ }
+ }
+ Map<Integer, ResourceWorkerSlot> deadTaskWorkers = new HashMap<>();
+ for (Integer task : deadTasks) {
+ LOG.info("Found " + topologyid + ",taskid:" + task + " is dead");
+
+ ResourceWorkerSlot resource = null;
+ if (assignment != null)
+ resource = assignment.getWorkerByTaskId(task);
+ if (resource != null) {
+ deadTaskWorkers.put(task, resource);
+ Date now = new Date();
+ String nowStr = TimeFormat.getSecond(now);
+ String errorInfo = "Task-" + task + " is dead on " + resource.getHostname() + ":" + resource.getPort() + ", " + nowStr;
+ LOG.info(errorInfo);
+ clusterState.report_task_error(topologyid, task, errorInfo, null);
+ }
+ }
+
+ if (deadTaskWorkers.size() > 0) {
+ // notify jstorm monitor
+ TaskDeadEvent event = new TaskDeadEvent();
+ event.clusterName = data.getClusterName();
+ event.topologyId = topologyid;
+ event.deadTasks = deadTaskWorkers;
+ event.timestamp = System.currentTimeMillis();
+ data.getMetricRunnable().pushEvent(event);
+ }
+ }
+ NimbusUtils.transition(data, topologyid, false, StatusType.monitor);
+ }
+
+ if (topologyHbInfo != null) {
+ try {
+ clusterState.topology_heartbeat(topologyid, topologyHbInfo);
+ } catch (Exception e) {
+ LOG.error("Failed to update task heartbeat info to ZK for " + topologyid, e);
+ }
}
}
} catch (Exception e) {
- // TODO Auto-generated catch block
LOG.error(e.getMessage(), e);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/TopologyAssignContext.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/TopologyAssignContext.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/TopologyAssignContext.java
index 12cdad0..9a3a879 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/TopologyAssignContext.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/TopologyAssignContext.java
@@ -38,6 +38,8 @@ public class TopologyAssignContext {
public static final int ASSIGN_TYPE_MONITOR = 2; // monitor a topology, some
// tasks are dead
+ protected String topologyId;
+
protected int assignType;
protected StormTopology rawTopology;
@@ -51,6 +53,9 @@ public class TopologyAssignContext {
protected Map<String, SupervisorInfo> cluster;
+ protected int topoMasterTaskId;
+ protected boolean assignSingleWorkerForTM = false;
+
protected Map<Integer, String> taskToComponent;
protected Set<Integer> allTaskIds; // all tasks
@@ -76,6 +81,17 @@ public class TopologyAssignContext {
this.deadTaskIds = copy.getDeadTaskIds();
this.unstoppedTaskIds = copy.getUnstoppedTaskIds();
this.isReassign = copy.isReassign();
+ this.topologyId = copy.getTopologyId();
+ this.topoMasterTaskId = copy.getTopologyMasterTaskId();
+ this.assignSingleWorkerForTM = copy.getAssignSingleWorkerForTM();
+ }
+
+ public String getTopologyId() {
+ return topologyId;
+ }
+
+ public void setTopologyId(String topologyId) {
+ this.topologyId = topologyId;
}
public int getAssignType() {
@@ -151,8 +167,7 @@ public class TopologyAssignContext {
}
public static boolean isAssignTypeValid(int type) {
- return (type == ASSIGN_TYPE_NEW) || (type == ASSIGN_TYPE_REBALANCE)
- || (type == ASSIGN_TYPE_MONITOR);
+ return (type == ASSIGN_TYPE_NEW) || (type == ASSIGN_TYPE_REBALANCE) || (type == ASSIGN_TYPE_MONITOR);
}
public Set<ResourceWorkerSlot> getUnstoppedWorkers() {
@@ -171,9 +186,24 @@ public class TopologyAssignContext {
this.isReassign = isReassign;
}
+ public int getTopologyMasterTaskId() {
+ return topoMasterTaskId;
+ }
+
+ public void setTopologyMasterTaskId(int taskId) {
+ this.topoMasterTaskId = taskId;
+ }
+
+ public boolean getAssignSingleWorkerForTM() {
+ return assignSingleWorkerForTM;
+ }
+
+ public void setAssignSingleWorkerForTM(boolean isAssign) {
+ this.assignSingleWorkerForTM = isAssign;
+ }
+
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyAssignContext.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyAssignContext.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyAssignContext.java
index 9eb2775..78649aa 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyAssignContext.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyAssignContext.java
@@ -17,30 +17,20 @@
*/
package com.alibaba.jstorm.schedule.default_assign;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-
import backtype.storm.Config;
-import backtype.storm.generated.Bolt;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.SpoutSpec;
-import backtype.storm.generated.StateSpoutSpec;
-import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.*;
import backtype.storm.utils.ThriftTopologyUtils;
-
+import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
import com.alibaba.jstorm.schedule.TopologyAssignContext;
import com.alibaba.jstorm.utils.FailedAssignTopologyException;
import com.alibaba.jstorm.utils.JStormUtils;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+
+import java.util.*;
+import java.util.Map.Entry;
public class DefaultTopologyAssignContext extends TopologyAssignContext {
@@ -49,19 +39,16 @@ public class DefaultTopologyAssignContext extends TopologyAssignContext {
private final Map<String, List<String>> hostToSid;
private final Set<ResourceWorkerSlot> oldWorkers;
private final Map<String, List<Integer>> componentTasks;
- private final Set<ResourceWorkerSlot> unstoppedWorkers =
- new HashSet<ResourceWorkerSlot>();
+ private final Set<ResourceWorkerSlot> unstoppedWorkers = new HashSet<ResourceWorkerSlot>();
private final int totalWorkerNum;
private final int unstoppedWorkerNum;
private int computeWorkerNum() {
- Integer settingNum =
- JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_WORKERS));
+ Integer settingNum = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_WORKERS));
- int hintSum = 0;
+ int ret = 0, hintSum = 0, tmCount = 0;
- Map<String, Object> components =
- ThriftTopologyUtils.getComponents(sysTopology);
+ Map<String, Object> components = ThriftTopologyUtils.getComponents(sysTopology);
for (Entry<String, Object> entry : components.entrySet()) {
String componentName = entry.getKey();
Object component = entry.getValue();
@@ -78,14 +65,35 @@ public class DefaultTopologyAssignContext extends TopologyAssignContext {
}
int hint = common.get_parallelism_hint();
+ if (componentName.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) {
+ tmCount += hint;
+ continue;
+ }
hintSum += hint;
}
if (settingNum == null) {
- return hintSum;
+ ret = hintSum;
} else {
- return Math.min(settingNum, hintSum);
+ ret = Math.min(settingNum, hintSum);
}
+
+ Boolean isTmSingleWorker = ConfigExtension.getTopologyMasterSingleWorker(stormConf);
+ if (isTmSingleWorker != null) {
+ if (isTmSingleWorker == true) {
+ // Assign a single worker for topology master
+ ret += tmCount;
+ setAssignSingleWorkerForTM(true);
+ }
+ } else {
+ // If not configured, judge this config by worker number
+ if (ret >= 10) {
+ ret += tmCount;
+ setAssignSingleWorkerForTM(true);
+ }
+ }
+
+ return ret;
}
public int computeUnstoppedAssignments() {
@@ -149,8 +157,7 @@ public class DefaultTopologyAssignContext extends TopologyAssignContext {
try {
sysTopology = Common.system_topology(stormConf, rawTopology);
} catch (Exception e) {
- throw new FailedAssignTopologyException(
- "Failed to generate system topology");
+ throw new FailedAssignTopologyException("Failed to generate system topology");
}
sidToHostname = generateSidToHost();
@@ -215,7 +222,6 @@ public class DefaultTopologyAssignContext extends TopologyAssignContext {
}
public String toDetailString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyScheduler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyScheduler.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyScheduler.java
index 5df7de4..99ba9da 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyScheduler.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/DefaultTopologyScheduler.java
@@ -32,8 +32,7 @@ import com.alibaba.jstorm.schedule.TopologyAssignContext;
import com.alibaba.jstorm.utils.FailedAssignTopologyException;
public class DefaultTopologyScheduler implements IToplogyScheduler {
- private static final Logger LOG = LoggerFactory
- .getLogger(DefaultTopologyScheduler.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultTopologyScheduler.class);
private Map nimbusConf;
@@ -57,8 +56,7 @@ public class DefaultTopologyScheduler implements IToplogyScheduler {
for (Integer task : canFree) {
ResourceWorkerSlot worker = oldAssigns.getWorkerByTaskId(task);
if (worker == null) {
- LOG.warn("When free rebalance resource, no ResourceAssignment of task "
- + task);
+ LOG.warn("When free rebalance resource, no ResourceAssignment of task " + task);
continue;
}
@@ -79,25 +77,22 @@ public class DefaultTopologyScheduler implements IToplogyScheduler {
} else if (assignType == TopologyAssignContext.ASSIGN_TYPE_REBALANCE) {
needAssign.addAll(context.getAllTaskIds());
needAssign.removeAll(context.getUnstoppedTaskIds());
- } else {
- // monitor
- needAssign.addAll(context.getDeadTaskIds());
+ } else { // ASSIGN_TYPE_MONITOR
+ Set<Integer> deadTasks = context.getDeadTaskIds();
+ needAssign.addAll(deadTasks);
}
return needAssign;
}
/**
- * Get the task Map which the task is alive and will be kept Only when type
- * is ASSIGN_TYPE_MONITOR, it is valid
+ * Get the task Map which the task is alive and will be kept Only when type is ASSIGN_TYPE_MONITOR, it is valid
*
* @param defaultContext
* @param needAssigns
* @return
*/
- public Set<ResourceWorkerSlot> getKeepAssign(
- DefaultTopologyAssignContext defaultContext,
- Set<Integer> needAssigns) {
+ public Set<ResourceWorkerSlot> getKeepAssign(DefaultTopologyAssignContext defaultContext, Set<Integer> needAssigns) {
Set<Integer> keepAssignIds = new HashSet<Integer>();
keepAssignIds.addAll(defaultContext.getAllTaskIds());
@@ -125,21 +120,17 @@ public class DefaultTopologyScheduler implements IToplogyScheduler {
}
@Override
- public Set<ResourceWorkerSlot> assignTasks(TopologyAssignContext context)
- throws FailedAssignTopologyException {
+ public Set<ResourceWorkerSlot> assignTasks(TopologyAssignContext context) throws FailedAssignTopologyException {
int assignType = context.getAssignType();
if (TopologyAssignContext.isAssignTypeValid(assignType) == false) {
- throw new FailedAssignTopologyException("Invalide Assign Type "
- + assignType);
+ throw new FailedAssignTopologyException("Invalide Assign Type " + assignType);
}
- DefaultTopologyAssignContext defaultContext =
- new DefaultTopologyAssignContext(context);
+ DefaultTopologyAssignContext defaultContext = new DefaultTopologyAssignContext(context);
if (assignType == TopologyAssignContext.ASSIGN_TYPE_REBALANCE) {
/**
- * Mark all current assigned worker as available. Current assignment
- * will be restored in task scheduler.
+ * Mark all current assigned worker as available. Current assignment will be restored in task scheduler.
*/
freeUsed(defaultContext);
}
@@ -148,36 +139,24 @@ public class DefaultTopologyScheduler implements IToplogyScheduler {
Set<Integer> needAssignTasks = getNeedAssignTasks(defaultContext);
- Set<ResourceWorkerSlot> keepAssigns =
- getKeepAssign(defaultContext, needAssignTasks);
+ Set<ResourceWorkerSlot> keepAssigns = getKeepAssign(defaultContext, needAssignTasks);
// please use tree map to make task sequence
Set<ResourceWorkerSlot> ret = new HashSet<ResourceWorkerSlot>();
ret.addAll(keepAssigns);
ret.addAll(defaultContext.getUnstoppedWorkers());
- int allocWorkerNum =
- defaultContext.getTotalWorkerNum()
- - defaultContext.getUnstoppedWorkerNum()
- - keepAssigns.size();
- LOG.info("allocWorkerNum=" + allocWorkerNum + ", totalWorkerNum="
- + defaultContext.getTotalWorkerNum());
+ int allocWorkerNum = defaultContext.getTotalWorkerNum() - defaultContext.getUnstoppedWorkerNum() - keepAssigns.size();
+ LOG.info("allocWorkerNum=" + allocWorkerNum + ", totalWorkerNum=" + defaultContext.getTotalWorkerNum() + ", keepWorkerNum=" + keepAssigns.size());
if (allocWorkerNum <= 0) {
- LOG.warn("Don't need assign workers, all workers are fine "
- + defaultContext.toDetailString());
- throw new FailedAssignTopologyException(
- "Don't need assign worker, all workers are fine ");
+ LOG.warn("Don't need assign workers, all workers are fine " + defaultContext.toDetailString());
+ throw new FailedAssignTopologyException("Don't need assign worker, all workers are fine ");
}
- List<ResourceWorkerSlot> availableWorkers =
- WorkerScheduler.getInstance().getAvailableWorkers(
- defaultContext, needAssignTasks, allocWorkerNum);
- TaskScheduler taskScheduler =
- new TaskScheduler(defaultContext, needAssignTasks,
- availableWorkers);
- Set<ResourceWorkerSlot> assignment =
- new HashSet<ResourceWorkerSlot>(taskScheduler.assign());
+ List<ResourceWorkerSlot> availableWorkers = WorkerScheduler.getInstance().getAvailableWorkers(defaultContext, needAssignTasks, allocWorkerNum);
+ TaskScheduler taskScheduler = new TaskScheduler(defaultContext, needAssignTasks, availableWorkers);
+ Set<ResourceWorkerSlot> assignment = new HashSet<ResourceWorkerSlot>(taskScheduler.assign());
ret.addAll(assignment);
LOG.info("Keep Alive slots:" + keepAssigns);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/ResourceWorkerSlot.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/ResourceWorkerSlot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/ResourceWorkerSlot.java
index c218f52..df8eba5 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/ResourceWorkerSlot.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/ResourceWorkerSlot.java
@@ -39,8 +39,7 @@ import com.alibaba.jstorm.utils.NetWorkUtils;
//one worker 's assignment
public class ResourceWorkerSlot extends WorkerSlot implements Serializable {
- public static Logger LOG = LoggerFactory
- .getLogger(ResourceWorkerSlot.class);
+ public static Logger LOG = LoggerFactory.getLogger(ResourceWorkerSlot.class);
private static final long serialVersionUID = 9138386287559932411L;
private String hostname;
@@ -58,16 +57,14 @@ public class ResourceWorkerSlot extends WorkerSlot implements Serializable {
super(supervisorId, port);
}
- public ResourceWorkerSlot(WorkerAssignment worker,
- Map<String, List<Integer>> componentToTask) {
+ public ResourceWorkerSlot(WorkerAssignment worker, Map<String, List<Integer>> componentToTask) {
super(worker.getNodeId(), worker.getPort());
this.hostname = worker.getHostName();
this.tasks = new HashSet<Integer>();
this.cpu = worker.getCpu();
this.memSize = worker.getMem();
this.jvm = worker.getJvm();
- for (Entry<String, Integer> entry : worker.getComponentToNum()
- .entrySet()) {
+ for (Entry<String, Integer> entry : worker.getComponentToNum().entrySet()) {
List<Integer> tasks = componentToTask.get(entry.getKey());
if (tasks == null || tasks.size() == 0)
continue;
@@ -121,12 +118,10 @@ public class ResourceWorkerSlot extends WorkerSlot implements Serializable {
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
- public boolean compareToUserDefineWorker(WorkerAssignment worker,
- Map<Integer, String> taskToComponent) {
+ public boolean compareToUserDefineWorker(WorkerAssignment worker, Map<Integer, String> taskToComponent) {
int cpu = worker.getCpu();
if (cpu != 0 && this.cpu != cpu)
return false;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/AbstractSelector.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/AbstractSelector.java b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/AbstractSelector.java
index eed0e39..1130b1a 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/AbstractSelector.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/schedule/default_assign/Selector/AbstractSelector.java
@@ -36,8 +36,7 @@ public abstract class AbstractSelector implements Selector {
this.context = context;
}
- protected List<ResourceWorkerSlot> selectWorker(
- List<ResourceWorkerSlot> list, Comparator<ResourceWorkerSlot> c) {
+ protected List<ResourceWorkerSlot> selectWorker(List<ResourceWorkerSlot> list, Comparator<ResourceWorkerSlot> c) {
List<ResourceWorkerSlot> result = new ArrayList<ResourceWorkerSlot>();
ResourceWorkerSlot best = null;
for (ResourceWorkerSlot worker : list) {
@@ -58,8 +57,7 @@ public abstract class AbstractSelector implements Selector {
}
@Override
- public List<ResourceWorkerSlot> select(List<ResourceWorkerSlot> result,
- String name) {
+ public List<ResourceWorkerSlot> select(List<ResourceWorkerSlot> result, String name) {
if (result.size() == 1)
return result;
result = this.selectWorker(result, workerComparator.get(name));