You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:41 UTC
[03/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutExecutors.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutExecutors.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutExecutors.java
index 05f745c..4bfbf2d 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutExecutors.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutExecutors.java
@@ -17,53 +17,56 @@
*/
package com.alibaba.jstorm.task.execute.spout;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.Config;
-import backtype.storm.Constants;
-import backtype.storm.spout.ISpoutOutputCollector;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.BatchTuple;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.DisruptorQueue;
-import backtype.storm.utils.WorkerClassLoader;
-
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.common.metric.Histogram;
+import com.alibaba.jstorm.cluster.Common;
+import com.alibaba.jstorm.common.metric.AsmGauge;
+import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.common.metric.TimerRatio;
import com.alibaba.jstorm.daemon.worker.timer.TaskBatchFlushTrigger;
import com.alibaba.jstorm.daemon.worker.timer.TimerConstants;
import com.alibaba.jstorm.daemon.worker.timer.TimerTrigger;
import com.alibaba.jstorm.metric.JStormMetrics;
+import com.alibaba.jstorm.metric.JStormMetricsReporter;
import com.alibaba.jstorm.metric.MetricDef;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.task.Task;
-import com.alibaba.jstorm.task.TaskBaseMetric;
-import com.alibaba.jstorm.task.TaskStatus;
-import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.task.TaskBatchTransfer;
+import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.task.acker.Acker;
import com.alibaba.jstorm.task.comm.TaskSendTargets;
import com.alibaba.jstorm.task.comm.TupleInfo;
-import com.alibaba.jstorm.task.error.ITaskReportErr;
import com.alibaba.jstorm.task.execute.BaseExecutors;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatRunable;
+import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.RotatingMap;
+import com.alibaba.jstorm.utils.TimeUtils;
+import com.codahale.metrics.Gauge;
import com.lmax.disruptor.EventHandler;
+import backtype.storm.Config;
+import backtype.storm.Constants;
+import backtype.storm.spout.ISpout;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.BatchTuple;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.WorkerClassLoader;
+
/**
* spout executor
- *
+ * <p/>
* All spout actions will be done here
- *
+ *
* @author yannian/Longda
- *
*/
public class SpoutExecutors extends BaseExecutors implements EventHandler {
private static Logger LOG = LoggerFactory.getLogger(SpoutExecutors.class);
@@ -73,123 +76,107 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler {
protected backtype.storm.spout.ISpout spout;
protected RotatingMap<Long, TupleInfo> pending;
- protected ISpoutOutputCollector output_collector;
+ protected SpoutOutputCollector outputCollector;
- protected boolean firstTime = true;
+ protected AsmHistogram nextTupleTimer;
+ protected AsmHistogram ackerTimer;
+ protected TimerRatio emptyCpuGauge;
- protected Histogram nextTupleTimer;
- protected Histogram ackerTimer;
- protected TimerRatio emptyCpuCounter;
+ private String topologyId;
+ private String componentId;
+ private int taskId;
protected AsyncLoopThread ackerRunnableThread;
protected boolean isSpoutFullSleep;
- public SpoutExecutors(Task task, backtype.storm.spout.ISpout _spout,
- TaskTransfer _transfer_fn,
- Map<Integer, DisruptorQueue> innerTaskTransfer, Map _storm_conf,
- TaskSendTargets sendTargets, TaskStatus taskStatus,
- TopologyContext topology_context, TopologyContext _user_context,
- TaskBaseMetric _task_stats, ITaskReportErr _report_error) {
- super(task, _transfer_fn, _storm_conf, innerTaskTransfer,
- topology_context, _user_context, _task_stats, taskStatus,
- _report_error);
+ //, backtype.storm.spout.ISpout _spout, TaskTransfer _transfer_fn, Map<Integer, DisruptorQueue> innerTaskTransfer,
+ //Map _storm_conf, TaskSendTargets sendTargets, TaskStatus taskStatus, TopologyContext topology_context, TopologyContext _user_context,
+ //TaskBaseMetric _task_stats, ITaskReportErr _report_error, JStormMetricsReporter metricReporter
+ public SpoutExecutors(Task task) {
+ super(task);
+
+ this.spout = (ISpout)task.getTaskObj();
- this.spout = _spout;
+ this.max_spout_pending = JStormUtils.parseInt(storm_conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING));
- this.max_spout_pending =
- JStormUtils.parseInt(storm_conf
- .get(Config.TOPOLOGY_MAX_SPOUT_PENDING));
+ this.topologyId = sysTopologyCtx.getTopologyId();
+ this.componentId = sysTopologyCtx.getThisComponentId();
+ this.taskId = task.getTaskId();
this.nextTupleTimer =
- JStormMetrics.registerTaskHistogram(taskId,
- MetricDef.EXECUTE_TIME);
+ (AsmHistogram) JStormMetrics.registerTaskMetric(
+ MetricUtils.taskMetricName(topologyId, componentId, taskId, MetricDef.EXECUTE_TIME, MetricType.HISTOGRAM), new AsmHistogram());
this.ackerTimer =
- JStormMetrics.registerTaskHistogram(taskId,
- MetricDef.ACKER_TIME);
+ (AsmHistogram) JStormMetrics.registerTaskMetric(
+ MetricUtils.taskMetricName(topologyId, componentId, taskId, MetricDef.ACKER_TIME, MetricType.HISTOGRAM), new AsmHistogram());
- this.emptyCpuCounter = new TimerRatio();
- JStormMetrics.registerTaskGauge(emptyCpuCounter, taskId,
- MetricDef.EMPTY_CPU_RATIO);
+ this.emptyCpuGauge = new TimerRatio();
+ JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(topologyId, componentId, taskId, MetricDef.EMPTY_CPU_RATIO, MetricType.GAUGE),
+ new AsmGauge(emptyCpuGauge));
isSpoutFullSleep = ConfigExtension.isSpoutPendFullSleep(storm_conf);
- if (ConfigExtension.isTaskBatchTuple(storm_conf)) {
- TaskBatchFlushTrigger batchFlushTrigger =
- new TaskBatchFlushTrigger(5, idStr
- + Constants.SYSTEM_COMPONENT_ID,
- (TaskBatchTransfer) _transfer_fn);
- batchFlushTrigger.register(TimeUnit.MILLISECONDS);
- }
-
LOG.info("isSpoutFullSleep:" + isSpoutFullSleep);
-
- }
-
- public void prepare(TaskSendTargets sendTargets, TaskTransfer transferFn,
- TopologyContext topologyContext) {
-
- JStormMetrics.registerTaskGauge(
- new com.codahale.metrics.Gauge<Double>() {
-
+
+ mkPending();
+
+ JStormMetrics.registerTaskMetric(
+ MetricUtils.taskMetricName(topologyId, componentId, taskId, MetricDef.PENDING_MAP, MetricType.GAUGE), new AsmGauge(
+ new Gauge<Double>() {
@Override
public Double getValue() {
return (double) pending.size();
}
-
- }, taskId, MetricDef.PENDING_MAP);
+ }));
// collector, in fact it call send_spout_msg
- this.output_collector =
- new SpoutCollector(taskId, spout, task_stats, sendTargets,
- storm_conf, transferFn, pending, topologyContext,
- exeQueue, report_error);
-
- try {
- WorkerClassLoader.switchThreadContext();
- this.spout.open(storm_conf, userTopologyCtx,
- new SpoutOutputCollector(output_collector));
- } catch (Throwable e) {
- error = e;
- LOG.error("spout open error ", e);
- report_error.report(e);
- } finally {
- WorkerClassLoader.restoreThreadContext();
- }
+ SpoutCollector collector = new SpoutCollector(task, pending, exeQueue);
+ this.outputCollector = new SpoutOutputCollector(collector);
+ taskTransfer.getBackpressureController().setOutputCollector(outputCollector);
+ taskHbTrigger.setSpoutOutputCollector(outputCollector);
LOG.info("Successfully create SpoutExecutors " + idStr);
-
+ }
+
+ public void mkPending() {
+ // this function will be override by subclass
+ throw new RuntimeException("Should override this function");
}
- public void nextTuple() {
- if (firstTime == true) {
+ @Override
+ public void init() throws Exception {
+
+ this.spout.open(storm_conf, userTopologyCtx, outputCollector);
- int delayRun = ConfigExtension.getSpoutDelayRunSeconds(storm_conf);
+ LOG.info("Successfully open SpoutExecutors " + idStr);
+
+ int delayRun = ConfigExtension.getSpoutDelayRunSeconds(storm_conf);
- // wait other bolt is ready
- JStormUtils.sleepMs(delayRun * 1000);
+ // wait other bolt is ready
+ JStormUtils.sleepMs(delayRun * 1000);
- emptyCpuCounter.init();
+ if (taskStatus.isRun()) {
+ spout.activate();
+ } else {
+ spout.deactivate();
+ }
- if (taskStatus.isRun() == true) {
- spout.activate();
- } else {
- spout.deactivate();
- }
+ LOG.info(idStr + " is ready ");
- firstTime = false;
- LOG.info(idStr + " is ready ");
- }
+ }
- if (taskStatus.isRun() == false) {
+ public void nextTuple() {
+
+ if (!taskStatus.isRun()) {
JStormUtils.sleepMs(1);
return;
}
// if don't need ack, pending map will be always empty
if (max_spout_pending == null || pending.size() < max_spout_pending) {
- emptyCpuCounter.stop();
+ emptyCpuGauge.stop();
long start = System.nanoTime();
try {
@@ -200,15 +187,13 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler {
report_error.report(e);
} finally {
long end = System.nanoTime();
- nextTupleTimer.update((end - start) / 1000000.0d);
+ nextTupleTimer.update((end - start) / TimeUtils.NS_PER_US);
}
-
- return;
} else {
if (isSpoutFullSleep) {
JStormUtils.sleepMs(1);
}
- emptyCpuCounter.start();
+ emptyCpuGauge.start();
// just return, no sleep
}
}
@@ -221,25 +206,23 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler {
/**
* Handle acker message
- *
- * @see com.lmax.disruptor.EventHandler#onEvent(java.lang.Object, long,
- * boolean)
+ *
+ * @see EventHandler#onEvent(Object, long, boolean)
*/
@Override
- public void onEvent(Object event, long sequence, boolean endOfBatch)
- throws Exception {
+ public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
long start = System.nanoTime();
try {
-
if (event == null) {
return;
}
-
Runnable runnable = null;
if (event instanceof Tuple) {
+ processControlEvent();
runnable = processTupleEvent((Tuple) event);
} else if (event instanceof BatchTuple) {
for (Tuple tuple : ((BatchTuple) event).getTuples()) {
+ processControlEvent();
runnable = processTupleEvent(tuple);
if (runnable != null) {
runnable.run();
@@ -257,8 +240,7 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler {
runnable = (Runnable) event;
} else {
- LOG.warn("Receive one unknow event-" + event.toString() + " "
- + idStr);
+ LOG.warn("Receive one unknow event-" + event.toString() + " " + idStr);
return;
}
@@ -272,42 +254,43 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler {
}
} finally {
long end = System.nanoTime();
- ackerTimer.update((end - start) / 1000000.0d);
+ ackerTimer.update((end - start) / TimeUtils.NS_PER_US);
}
}
private Runnable processTupleEvent(Tuple event) {
- Runnable runnable;
+ Runnable runnable = null;
Tuple tuple = (Tuple) event;
- Object id = tuple.getValue(0);
- Object obj = pending.remove((Long) id);
-
- if (obj == null) {
- if (isDebug) {
- LOG.info("Pending map no entry:" + id);
- }
- runnable = null;
+ if (event.getSourceStreamId().equals(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID)) {
+ TopoMasterCtrlEvent ctrlEvent = (TopoMasterCtrlEvent) tuple.getValueByField("ctrlEvent");
+ taskTransfer.getBackpressureController().control(ctrlEvent);
} else {
- TupleInfo tupleInfo = (TupleInfo) obj;
+ Object id = tuple.getValue(0);
+ Object obj = pending.remove((Long) id);
- String stream_id = tuple.getSourceStreamId();
+ if (obj == null) {
+ if (isDebug) {
+ LOG.info("Pending map no entry:" + id);
+ }
+ runnable = null;
+ } else {
+ TupleInfo tupleInfo = (TupleInfo) obj;
- if (stream_id.equals(Acker.ACKER_ACK_STREAM_ID)) {
+ String stream_id = tuple.getSourceStreamId();
- runnable =
- new AckSpoutMsg(spout, tupleInfo, task_stats, isDebug);
- } else if (stream_id.equals(Acker.ACKER_FAIL_STREAM_ID)) {
- runnable =
- new FailSpoutMsg(id, spout, tupleInfo, task_stats,
- isDebug);
- } else {
- LOG.warn("Receive one unknow source Tuple " + idStr);
- runnable = null;
+ if (stream_id.equals(Acker.ACKER_ACK_STREAM_ID)) {
+
+ runnable = new AckSpoutMsg(spout, tuple, tupleInfo, task_stats, isDebug);
+ } else if (stream_id.equals(Acker.ACKER_FAIL_STREAM_ID)) {
+ runnable = new FailSpoutMsg(id, spout, tupleInfo, task_stats, isDebug);
+ } else {
+ LOG.warn("Receive one unknow source Tuple " + idStr);
+ runnable = null;
+ }
}
- }
- task_stats.recv_tuple(tuple.getSourceComponent(),
- tuple.getSourceStreamId());
+ task_stats.recv_tuple(tuple.getSourceComponent(), tuple.getSourceStreamId());
+ }
return runnable;
}
@@ -317,28 +300,23 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler {
private void processTimerEvent(TimerTrigger.TimerEvent event) {
switch (event.getOpCode()) {
- case TimerConstants.ROTATING_MAP: {
- Map<Long, TupleInfo> timeoutMap = pending.rotate();
- for (java.util.Map.Entry<Long, TupleInfo> entry : timeoutMap
- .entrySet()) {
- TupleInfo tupleInfo = entry.getValue();
- FailSpoutMsg fail =
- new FailSpoutMsg(entry.getKey(), spout,
- (TupleInfo) tupleInfo, task_stats, isDebug);
- fail.run();
+ case TimerConstants.ROTATING_MAP: {
+ Map<Long, TupleInfo> timeoutMap = pending.rotate();
+ for (Map.Entry<Long, TupleInfo> entry : timeoutMap.entrySet()) {
+ TupleInfo tupleInfo = entry.getValue();
+ FailSpoutMsg fail = new FailSpoutMsg(entry.getKey(), spout, (TupleInfo) tupleInfo, task_stats, isDebug);
+ fail.run();
+ }
+ break;
+ }
+ case TimerConstants.TASK_HEARTBEAT: {
+ taskHbTrigger.setExeThreadHbTime(TimeUtils.current_time_secs());
+ break;
+ }
+ default: {
+ LOG.warn("Receive unsupported timer event, opcode=" + event.getOpCode());
+ break;
}
- break;
- }
- case TimerConstants.TASK_HEARTBEAT: {
- Integer taskId = (Integer) event.getMsg();
- TaskHeartbeatRunable.updateTaskHbStats(taskId, task);
- break;
- }
- default: {
- LOG.warn("Receive unsupported timer event, opcode="
- + event.getOpCode());
- break;
- }
}
}
@@ -349,9 +327,12 @@ public class SpoutExecutors extends BaseExecutors implements EventHandler {
if (event instanceof TimerTrigger.TimerEvent) {
processTimerEvent((TimerTrigger.TimerEvent) event);
} else {
- LOG.warn("Received unknown control event, "
- + event.getClass().getName());
+ LOG.warn("Received unknown control event, " + event.getClass().getName());
}
}
}
+
+ public Object getOutputCollector() {
+ return outputCollector;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutTimeoutCallBack.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutTimeoutCallBack.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutTimeoutCallBack.java
index 968831b..b64bc30 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutTimeoutCallBack.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/execute/spout/SpoutTimeoutCallBack.java
@@ -31,8 +31,7 @@ import com.alibaba.jstorm.utils.ExpiredCallback;
import com.alibaba.jstorm.utils.JStormUtils;
public class SpoutTimeoutCallBack<K, V> implements ExpiredCallback<K, V> {
- private static Logger LOG = LoggerFactory
- .getLogger(SpoutTimeoutCallBack.class);
+ private static Logger LOG = LoggerFactory.getLogger(SpoutTimeoutCallBack.class);
private DisruptorQueue disruptorEventQueue;
private backtype.storm.spout.ISpout spout;
@@ -40,16 +39,12 @@ public class SpoutTimeoutCallBack<K, V> implements ExpiredCallback<K, V> {
private TaskBaseMetric task_stats;
private boolean isDebug;
- public SpoutTimeoutCallBack(DisruptorQueue disruptorEventQueue,
- backtype.storm.spout.ISpout _spout, Map _storm_conf,
- TaskBaseMetric stat) {
+ public SpoutTimeoutCallBack(DisruptorQueue disruptorEventQueue, backtype.storm.spout.ISpout _spout, Map _storm_conf, TaskBaseMetric stat) {
this.storm_conf = _storm_conf;
this.disruptorEventQueue = disruptorEventQueue;
this.spout = _spout;
this.task_stats = stat;
- this.isDebug =
- JStormUtils.parseBoolean(storm_conf.get(Config.TOPOLOGY_DEBUG),
- false);
+ this.isDebug = JStormUtils.parseBoolean(storm_conf.get(Config.TOPOLOGY_DEBUG), false);
}
/**
@@ -62,9 +57,7 @@ public class SpoutTimeoutCallBack<K, V> implements ExpiredCallback<K, V> {
}
try {
TupleInfo tupleInfo = (TupleInfo) val;
- FailSpoutMsg fail =
- new FailSpoutMsg(key, spout, (TupleInfo) tupleInfo,
- task_stats, isDebug);
+ FailSpoutMsg fail = new FailSpoutMsg(key, spout, (TupleInfo) tupleInfo, task_stats, isDebug);
disruptorEventQueue.publish(fail);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkCustomGrouper.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkCustomGrouper.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkCustomGrouper.java
index bb6ad9c..46eefe8 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkCustomGrouper.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkCustomGrouper.java
@@ -34,9 +34,7 @@ public class MkCustomGrouper {
private int myTaskId;
- public MkCustomGrouper(TopologyContext context,
- CustomStreamGrouping _grouping, GlobalStreamId stream,
- List<Integer> targetTask, int myTaskId) {
+ public MkCustomGrouper(TopologyContext context, CustomStreamGrouping _grouping, GlobalStreamId stream, List<Integer> targetTask, int myTaskId) {
this.myTaskId = myTaskId;
this.grouping = _grouping;
this.grouping.prepare(context, stream, targetTask);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkFieldsGrouper.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkFieldsGrouper.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkFieldsGrouper.java
index 3bf6518..66f2567 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkFieldsGrouper.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkFieldsGrouper.java
@@ -26,17 +26,16 @@ import com.alibaba.jstorm.utils.JStormUtils;
/**
* field grouping
- *
+ *
* @author yannian
- *
+ *
*/
public class MkFieldsGrouper {
private Fields out_fields;
private Fields group_fields;
private List<Integer> out_tasks;
- public MkFieldsGrouper(Fields _out_fields, Fields _group_fields,
- List<Integer> _out_tasks) {
+ public MkFieldsGrouper(Fields _out_fields, Fields _group_fields, List<Integer> _out_tasks) {
for (Iterator<String> it = _group_fields.iterator(); it.hasNext();) {
String groupField = it.next();
@@ -52,8 +51,7 @@ public class MkFieldsGrouper {
}
public List<Integer> grouper(List<Object> values) {
- int hashcode =
- this.out_fields.select(this.group_fields, values).hashCode();
+ int hashcode = this.out_fields.select(this.group_fields, values).hashCode();
int group = Math.abs(hashcode % this.out_tasks.size());
return JStormUtils.mk_list(out_tasks.get(group));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkGrouper.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkGrouper.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkGrouper.java
index 5408afd..30d641c 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkGrouper.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkGrouper.java
@@ -17,14 +17,6 @@
*/
package com.alibaba.jstorm.task.group;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.Grouping;
import backtype.storm.generated.JavaObject;
@@ -32,11 +24,17 @@ import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
-
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.RandomRange;
import com.alibaba.jstorm.utils.Thrift;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
/**
* Grouper, get which task should be send to for one tuple
@@ -66,9 +64,8 @@ public class MkGrouper {
private MkLocalShuffer local_shuffer_grouper;
private MkLocalFirst localFirst;
- public MkGrouper(TopologyContext _topology_context, Fields _out_fields,
- Grouping _thrift_grouping, List<Integer> _outTasks,
- String streamId, WorkerData workerData) {
+ public MkGrouper(TopologyContext _topology_context, Fields _out_fields, Grouping _thrift_grouping, List<Integer> _outTasks, String streamId,
+ WorkerData workerData) {
this.topology_context = _topology_context;
this.out_fields = _out_fields;
this.thrift_grouping = _thrift_grouping;
@@ -83,8 +80,7 @@ public class MkGrouper {
this.grouptype = this.parseGroupType(workerData);
String id = _topology_context.getThisTaskId() + ":" + streamId;
- LOG.info(id + " grouptype is " + grouptype + ", out_tasks is "
- + out_tasks + ", local_tasks" + local_tasks);
+ LOG.info(id + " grouptype is " + grouptype + ", out_tasks is " + out_tasks + ", local_tasks" + local_tasks);
}
@@ -104,12 +100,10 @@ public class MkGrouper {
grouperType = GrouperType.global;
} else {
- List<String> fields_group =
- Thrift.fieldGrouping(thrift_grouping);
+ List<String> fields_group = Thrift.fieldGrouping(thrift_grouping);
Fields fields = new Fields(fields_group);
- fields_grouper =
- new MkFieldsGrouper(out_fields, fields, out_tasks);
+ fields_grouper = new MkFieldsGrouper(out_fields, fields, out_tasks);
// hashcode by fields
grouperType = GrouperType.fields;
@@ -132,29 +126,23 @@ public class MkGrouper {
int myTaskId = topology_context.getThisTaskId();
String componentId = topology_context.getComponentId(myTaskId);
GlobalStreamId stream = new GlobalStreamId(componentId, streamId);
- custom_grouper =
- new MkCustomGrouper(topology_context, g, stream, out_tasks,
- myTaskId);
+ custom_grouper = new MkCustomGrouper(topology_context, g, stream, out_tasks, myTaskId);
grouperType = GrouperType.custom_obj;
} else if (Grouping._Fields.CUSTOM_SERIALIZED.equals(fields)) {
// user custom group by serialized Object
byte[] obj = thrift_grouping.get_custom_serialized();
- CustomStreamGrouping g =
- (CustomStreamGrouping) Utils.javaDeserialize(obj);
+ CustomStreamGrouping g = (CustomStreamGrouping) Utils.javaDeserialize(obj);
int myTaskId = topology_context.getThisTaskId();
String componentId = topology_context.getComponentId(myTaskId);
GlobalStreamId stream = new GlobalStreamId(componentId, streamId);
- custom_grouper =
- new MkCustomGrouper(topology_context, g, stream, out_tasks,
- myTaskId);
+ custom_grouper = new MkCustomGrouper(topology_context, g, stream, out_tasks, myTaskId);
grouperType = GrouperType.custom_serialized;
} else if (Grouping._Fields.DIRECT.equals(fields)) {
// directly send to a special task
grouperType = GrouperType.direct;
} else if (Grouping._Fields.LOCAL_OR_SHUFFLE.equals(fields)) {
grouperType = GrouperType.local_or_shuffle;
- local_shuffer_grouper =
- new MkLocalShuffer(local_tasks, out_tasks, workerData);
+ local_shuffer_grouper = new MkLocalShuffer(local_tasks, out_tasks, workerData);
} else if (Grouping._Fields.LOCAL_FIRST.equals(fields)) {
grouperType = GrouperType.localFirst;
localFirst = new MkLocalFirst(local_tasks, out_tasks, workerData);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalFirst.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalFirst.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalFirst.java
index 56f9175..92fa18b 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalFirst.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalFirst.java
@@ -40,8 +40,7 @@ import com.alibaba.jstorm.utils.RandomRange;
* @version
*/
public class MkLocalFirst extends Shuffer {
- private static final Logger LOG = LoggerFactory
- .getLogger(MkLocalFirst.class);
+ private static final Logger LOG = LoggerFactory.getLogger(MkLocalFirst.class);
private List<Integer> allOutTasks = new ArrayList<Integer>();
private List<Integer> localOutTasks = new ArrayList<Integer>();
@@ -52,8 +51,7 @@ public class MkLocalFirst extends Shuffer {
private WorkerData workerData;
private IntervalCheck intervalCheck;
- public MkLocalFirst(List<Integer> workerTasks, List<Integer> allOutTasks,
- WorkerData workerData) {
+ public MkLocalFirst(List<Integer> workerTasks, List<Integer> allOutTasks, WorkerData workerData) {
super(workerData);
intervalCheck = new IntervalCheck();
@@ -74,7 +72,6 @@ public class MkLocalFirst extends Shuffer {
if (localWorkerOutTasks.size() != 0) {
isLocalWorkerAvail = true;
localOutTasks.addAll(localWorkerOutTasks);
- remoteOutTasks.removeAll(localWorkerOutTasks);
} else {
isLocalWorkerAvail = false;
}
@@ -93,8 +90,7 @@ public class MkLocalFirst extends Shuffer {
for (i = 0; i < size; i++) {
Integer taskId = outTasks.get(index);
boolean taskStatus = workerData.isOutboundTaskActive(taskId);
- DisruptorQueue exeQueue =
- (workerData.getInnerTaskTransfer().get(taskId));
+ DisruptorQueue exeQueue = (workerData.getInnerTaskTransfer().get(taskId));
float queueLoadRatio = exeQueue != null ? exeQueue.pctFull() : 0;
if (taskStatus && queueLoadRatio < 1.0)
break;
@@ -123,7 +119,6 @@ public class MkLocalFirst extends Shuffer {
}
return JStormUtils.mk_list(remoteOutTasks.get(index));
}
-
public List<Integer> grouper(List<Object> values) {
List<Integer> ret;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalShuffer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalShuffer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalShuffer.java
index 324e1e6..c57d380 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalShuffer.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/MkLocalShuffer.java
@@ -1,37 +1,28 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package com.alibaba.jstorm.task.group;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
import com.alibaba.jstorm.daemon.worker.WorkerData;
+import com.alibaba.jstorm.utils.IntervalCheck;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.RandomRange;
public class MkLocalShuffer extends Shuffer {
+ private static final Logger LOG = Logger.getLogger(MkLocalShuffer.class);
private List<Integer> outTasks;
private RandomRange randomrange;
+ private Set<Integer> lastLocalNodeTasks;
+ private IntervalCheck intervalCheck;
+ private WorkerData workerData;
private boolean isLocal;
public MkLocalShuffer(List<Integer> workerTasks, List<Integer> allOutTasks,
- WorkerData workerData) {
+ WorkerData workerData) {
super(workerData);
List<Integer> localOutTasks = new ArrayList<Integer>();
@@ -40,6 +31,9 @@ public class MkLocalShuffer extends Shuffer {
localOutTasks.add(outTask);
}
}
+ this.workerData = workerData;
+ intervalCheck = new IntervalCheck();
+ intervalCheck.setInterval(60);
if (localOutTasks.size() != 0) {
this.outTasks = localOutTasks;
@@ -47,13 +41,43 @@ public class MkLocalShuffer extends Shuffer {
} else {
this.outTasks = new ArrayList<Integer>();
this.outTasks.addAll(allOutTasks);
+ refreshLocalNodeTasks();
isLocal = false;
}
+ randomrange = new RandomRange(outTasks.size());
+ }
+
+ /**
+ * Don't need to take care of multiple thread, One task one thread
+ */
+ private void refreshLocalNodeTasks() {
+ Set<Integer> localNodeTasks = workerData.getLocalNodeTasks();
+
+ if (localNodeTasks == null || localNodeTasks.equals(lastLocalNodeTasks) ) {
+ return;
+ }
+ LOG.info("Old localNodeTasks:" + lastLocalNodeTasks + ", new:"
+ + localNodeTasks);
+ lastLocalNodeTasks = localNodeTasks;
+
+ List<Integer> localNodeOutTasks = new ArrayList<Integer>();
+ for (Integer outTask : outTasks) {
+ if (localNodeTasks.contains(outTask)) {
+ localNodeOutTasks.add(outTask);
+ }
+ }
+
+ if (localNodeOutTasks.isEmpty() == false) {
+ this.outTasks = localNodeOutTasks;
+ }
randomrange = new RandomRange(outTasks.size());
}
public List<Integer> grouper(List<Object> values) {
+ if (!isLocal && intervalCheck.check()) {
+ refreshLocalNodeTasks();
+ }
int index = getActiveTask(randomrange, outTasks);
// If none active tasks were found, still send message to a task
if (index == -1)
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/Shuffer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/Shuffer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/Shuffer.java
index acad674..3d272bf 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/Shuffer.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/group/Shuffer.java
@@ -38,8 +38,7 @@ public abstract class Shuffer {
int i = 0;
for (i = 0; i < size; i++) {
- if (workerData.isOutboundTaskActive(Integer.valueOf(outTasks
- .get(index))))
+ if (workerData.isOutboundTaskActive(Integer.valueOf(outTasks.get(index))))
break;
else
index = randomrange.nextInt();
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeat.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeat.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeat.java
deleted file mode 100755
index 532f553..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeat.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.task.heartbeat;
-
-import java.io.Serializable;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-
-/**
- * Task heartbeat, this Object will be updated to ZK timely
- *
- * @author yannian
- *
- */
-public class TaskHeartbeat implements Serializable {
-
- private static final long serialVersionUID = -6369195955255963810L;
- private Integer timeSecs;
- private Integer uptimeSecs;
-
- public TaskHeartbeat(int timeSecs, int uptimeSecs) {
- this.timeSecs = timeSecs;
- this.uptimeSecs = uptimeSecs;
- }
-
- public int getTimeSecs() {
- return timeSecs;
- }
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
- }
-
- public void setTimeSecs(int timeSecs) {
- this.timeSecs = timeSecs;
- }
-
- public int getUptimeSecs() {
- return uptimeSecs;
- }
-
- public void setUptimeSecs(int uptimeSecs) {
- this.uptimeSecs = uptimeSecs;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result =
- prime * result + ((timeSecs == null) ? 0 : timeSecs.hashCode());
- result =
- prime * result
- + ((uptimeSecs == null) ? 0 : uptimeSecs.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- TaskHeartbeat other = (TaskHeartbeat) obj;
- if (timeSecs == null) {
- if (other.timeSecs != null)
- return false;
- } else if (!timeSecs.equals(other.timeSecs))
- return false;
- if (uptimeSecs == null) {
- if (other.uptimeSecs != null)
- return false;
- } else if (!uptimeSecs.equals(other.uptimeSecs))
- return false;
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatRunable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatRunable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatRunable.java
deleted file mode 100644
index be66911..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatRunable.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.task.heartbeat;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-
-import com.alibaba.jstorm.callback.RunnableCallback;
-import com.alibaba.jstorm.cluster.StormClusterState;
-import com.alibaba.jstorm.daemon.worker.WorkerData;
-import com.alibaba.jstorm.schedule.Assignment.AssignmentType;
-import com.alibaba.jstorm.task.Task;
-import com.alibaba.jstorm.task.UptimeComputer;
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.jstorm.utils.TimeUtils;
-
-/**
- * Task hearbeat
- *
- * @author yannian
- *
- */
-public class TaskHeartbeatRunable extends RunnableCallback {
- private static final Logger LOG = LoggerFactory
- .getLogger(TaskHeartbeatRunable.class);
-
- private StormClusterState zkCluster;
- private String topology_id;
- private UptimeComputer uptime;
- private Map storm_conf;
- private Integer frequence;
- private Map<Integer, Long> taskAssignTsMap = new HashMap<Integer, Long>();
-
- private static Map<Integer, TaskStats> taskStatsMap =
- new HashMap<Integer, TaskStats>();
- private static LinkedBlockingDeque<Event> eventQueue =
- new LinkedBlockingDeque<TaskHeartbeatRunable.Event>();
-
- public static void registerTaskStats(int taskId, TaskStats taskStats) {
- Event event = new Event(Event.REGISTER_TYPE, taskId, taskStats);
- eventQueue.offer(event);
- }
-
- public static void unregisterTaskStats(int taskId) {
- Event event = new Event(Event.UNREGISTER_TYPE, taskId, null);
- eventQueue.offer(event);
- }
-
- public static void updateTaskHbStats(int taskId, Task taskData) {
- Event event = new Event(Event.TASK_HEARTBEAT_TYPE, taskId, taskData);
- eventQueue.offer(event);
- }
-
- public TaskHeartbeatRunable(WorkerData workerData) {
-
- this.zkCluster = workerData.getZkCluster();
- this.topology_id = workerData.getTopologyId();
- this.uptime = new UptimeComputer();
- this.storm_conf = workerData.getStormConf();
-
- String key = Config.TASK_HEARTBEAT_FREQUENCY_SECS;
- Object time = storm_conf.get(key);
- frequence = JStormUtils.parseInt(time, 10);
-
- }
-
- public void handle() throws InterruptedException {
- Event event = eventQueue.take();
- while (event != null) {
- switch (event.getType()) {
- case Event.TASK_HEARTBEAT_TYPE: {
- updateTaskHbStats(event);
- break;
- }
- case Event.REGISTER_TYPE: {
- Event<TaskStats> regEvent = event;
- taskStatsMap.put(event.getTaskId(), regEvent.getEventValue());
- taskAssignTsMap.put(event.getTaskId(),
- System.currentTimeMillis());
- break;
- }
- case Event.UNREGISTER_TYPE: {
- taskStatsMap.remove(event.getTaskId());
- taskAssignTsMap.remove(event.getTaskId());
- break;
- }
- default: {
- LOG.warn("Unknown event type received:" + event.getType());
- break;
- }
- }
-
- event = eventQueue.take();
- }
- }
-
- @Override
- public void run() {
- try {
- handle();
- } catch (InterruptedException e) {
- LOG.info(e.getMessage());
- }
- }
-
- @Override
- public Object getResult() {
- return frequence;
- }
-
- public void updateTaskHbStats(Event event) {
- Integer currtime = TimeUtils.current_time_secs();
- Event<Task> taskHbEvent = event;
- int taskId = taskHbEvent.getTaskId();
- String idStr = " " + topology_id + ":" + taskId + " ";
-
- try {
-
- TaskHeartbeat hb = new TaskHeartbeat(currtime, uptime.uptime());
- zkCluster.task_heartbeat(topology_id, taskId, hb);
-
- LOG.info("update task hearbeat ts " + currtime + " for" + idStr);
-
- // Check if assignment is changed. e.g scale-out
- Task task = taskHbEvent.getEventValue();
- Long timeStamp = taskAssignTsMap.get(taskId);
- if (timeStamp != null) {
- if (timeStamp < task.getWorkerAssignmentTs() &&
- task.getWorkerAssignmentType().equals(AssignmentType.Assign)) {
- LOG.info("Start to update the task data for task-" + taskId);
- task.updateTaskData();
- taskAssignTsMap.put(taskId, task.getWorkerAssignmentTs());
- }
- }
- } catch (Exception e) {
- // TODO Auto-generated catch block
- String errMsg = "Failed to update heartbeat to ZK " + idStr + "\n";
- LOG.error(errMsg, e);
- }
- }
-
- private static class Event<T> {
- public static final int REGISTER_TYPE = 0;
- public static final int UNREGISTER_TYPE = 1;
- public static final int TASK_HEARTBEAT_TYPE = 2;
- private final int type;
- private final int taskId;
- private final T value;
-
- public Event(int type, int taskId, T value) {
- this.type = type;
- this.taskId = taskId;
- this.value = value;
- }
-
- public int getType() {
- return type;
- }
-
- public int getTaskId() {
- return taskId;
- }
-
- public T getEventValue() {
- return value;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatUpdater.java
new file mode 100644
index 0000000..86d72f4
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/heartbeat/TaskHeartbeatUpdater.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.task.heartbeat;
+
+import backtype.storm.generated.TaskHeartbeat;
+import backtype.storm.generated.TopologyTaskHbInfo;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.NimbusClient;
+
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.cluster.StormClusterState;
+import com.alibaba.jstorm.task.UptimeComputer;
+import com.alibaba.jstorm.utils.TimeUtils;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Update the task heartbeat information of topology to Nimbus
+ *
+ * @author Basti Liu
+ *
+ */
+public class TaskHeartbeatUpdater{
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TaskHeartbeatUpdater.class);
+
+ private int MAX_NUM_TASK_HB_SEND;
+
+ private String topologyId;
+ private int taskId;
+
+ private Map conf;
+ private NimbusClient client;
+
+ private Map<Integer, TaskHeartbeat> taskHbMap;
+ private TopologyTaskHbInfo taskHbs;
+
+ private StormClusterState zkCluster;
+
+ public TaskHeartbeatUpdater(Map conf, String topologyId, int taskId, StormClusterState zkCluster) {
+ this.topologyId = topologyId;
+ this.taskId = taskId;
+
+ this.conf = conf;
+ this.client = NimbusClient.getConfiguredClient(conf);
+
+ this.zkCluster = zkCluster;
+
+ try {
+ TopologyTaskHbInfo taskHbInfo = zkCluster.topology_heartbeat(topologyId);
+ if (taskHbInfo != null) {
+ LOG.info("Found task heartbeat info left in zk for " + topologyId + ": " + taskHbInfo.toString());
+ this.taskHbs = taskHbInfo;
+ this.taskHbMap = taskHbInfo.get_taskHbs();
+ if (this.taskHbMap == null) {
+ this.taskHbMap = new ConcurrentHashMap<Integer, TaskHeartbeat>();
+ taskHbs.set_taskHbs(this.taskHbMap);
+ }
+ this.taskHbs.set_topologyId(topologyId);
+ this.taskHbs.set_topologyMasterId(this.taskId);
+ } else {
+ LOG.info("There is not any previous task heartbeat info left in zk for " + topologyId);
+ this.taskHbMap = new ConcurrentHashMap<Integer, TaskHeartbeat>();
+ this.taskHbs = new TopologyTaskHbInfo(this.topologyId, this.taskId);
+ this.taskHbs.set_taskHbs(taskHbMap);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to get topology heartbeat from zk", e);
+ }
+ this.MAX_NUM_TASK_HB_SEND = ConfigExtension.getTopologyTaskHbSendNumber(conf);
+ }
+
+ public void process(Tuple input) {
+ int sourceTask = input.getSourceTask();
+ int uptime = (Integer) input.getValue(0);
+
+ // Update the heartbeat for source task
+ TaskHeartbeat taskHb = taskHbMap.get(sourceTask);
+ if (taskHb == null) {
+ taskHb = new TaskHeartbeat(TimeUtils.current_time_secs(), uptime);
+ taskHbMap.put(sourceTask, taskHb);
+ } else {
+ taskHb.set_time(TimeUtils.current_time_secs());
+ taskHb.set_uptime(uptime);
+ }
+
+ // Send heartbeat info of all tasks to nimbus
+ if (sourceTask == taskId) {
+ // Send heartbeat info of MAX_NUM_TASK_HB_SEND tasks each time
+ TopologyTaskHbInfo tmpTaskHbInfo = new TopologyTaskHbInfo(topologyId, taskId);
+ Map<Integer, TaskHeartbeat> tmpTaskHbMap = new ConcurrentHashMap<Integer, TaskHeartbeat>();
+ tmpTaskHbInfo.set_taskHbs(tmpTaskHbMap);
+
+ int sendCount = 0;
+ for (Entry<Integer, TaskHeartbeat> entry : taskHbMap.entrySet()) {
+ tmpTaskHbMap.put(entry.getKey(), entry.getValue());
+ sendCount++;
+
+ if (sendCount >= MAX_NUM_TASK_HB_SEND) {
+ setTaskHeatbeat(tmpTaskHbInfo);
+ tmpTaskHbMap.clear();
+ sendCount = 0;
+ }
+ }
+ if (tmpTaskHbMap.size() > 0) {
+ setTaskHeatbeat(tmpTaskHbInfo);
+ }
+ }
+ }
+
+ private void setTaskHeatbeat(TopologyTaskHbInfo topologyTaskHbInfo) {
+ try {
+ if (topologyTaskHbInfo == null) {
+ return;
+ }
+ if (topologyTaskHbInfo.get_taskHbs() == null) {
+ return;
+ }
+
+ client.getClient().updateTaskHeartbeat(topologyTaskHbInfo);
+
+ String info = "";
+ for (Entry<Integer, TaskHeartbeat> entry : topologyTaskHbInfo.get_taskHbs().entrySet()) {
+ info += " " + entry.getKey() + "-" + entry.getValue().get_time();
+ }
+ LOG.info("Update task heartbeat:" + info);
+ } catch (TException e) {
+ LOG.error("Failed to update task heartbeat info", e);
+ if (client != null) {
+ client.close();
+ client = NimbusClient.getConfiguredClient(conf);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopoMasterCtrlEvent.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopoMasterCtrlEvent.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopoMasterCtrlEvent.java
new file mode 100644
index 0000000..adc8dc0
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopoMasterCtrlEvent.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.task.master;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Definition of control event which is used for the control purpose in
+ * topology, e.g. back pressure
+ *
+ * @author Basti Liu
+ */
+
+public class TopoMasterCtrlEvent implements Serializable {
+
+ private static final long serialVersionUID = 5929540385279089750L;
+
+ public enum EventType {
+ startBackpressure, stopBackpressure, syncBackpressureState, updateBackpressureConfig, defaultType
+ }
+
+ private EventType eventType;
+ private List<Object> eventValue;
+
+ public TopoMasterCtrlEvent() {
+ eventType = EventType.defaultType;
+ eventValue = null;
+ }
+
+ public TopoMasterCtrlEvent(EventType type, List<Object> value) {
+ this.eventType = type;
+ this.eventValue = value;
+ }
+
+ public EventType getEventType() {
+ return eventType;
+ }
+
+ public void setEventType(EventType type) {
+ this.eventType = type;
+ }
+
+ public List<Object> getEventValue() {
+ return eventValue;
+ }
+
+ public void setEventValue(List<Object> value) {
+ this.eventValue = value;
+ }
+
+ public void addEventValue(Object value) {
+ if (eventValue == null) {
+ eventValue = new ArrayList<Object>();
+ }
+
+ eventValue.add(value);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopologyMaster.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopologyMaster.java b/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopologyMaster.java
new file mode 100644
index 0000000..b5fb22c
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/task/master/TopologyMaster.java
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.task.master;
+
+import backtype.storm.generated.*;
+import backtype.storm.task.IBolt;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IDynamicComponent;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.NimbusClient;
+import com.alibaba.jstorm.cluster.Common;
+import com.alibaba.jstorm.cluster.StormClusterState;
+import com.alibaba.jstorm.cluster.StormConfig;
+import com.alibaba.jstorm.metric.MetaType;
+import com.alibaba.jstorm.metric.MetricUtils;
+import com.alibaba.jstorm.metric.TopologyMetricContext;
+import com.alibaba.jstorm.schedule.Assignment;
+import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
+import com.alibaba.jstorm.task.backpressure.BackpressureCoordinator;
+import com.alibaba.jstorm.task.heartbeat.TaskHeartbeatUpdater;
+import com.alibaba.jstorm.utils.IntervalCheck;
+import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.TimeUtils;
+import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Topology master is responsible for the process of general topology
+ * information, e.g. task heartbeat update, metrics data update....
+ *
+ * @author Basti Liu
+ */
+public class TopologyMaster implements IBolt, IDynamicComponent {
+
+ private static final long serialVersionUID = 4690656768333833626L;
+
+ private static final Logger LOG = getLogger(TopologyMaster.class);
+ private final Logger metricLogger = getLogger(TopologyMetricContext.class);
+
+ public static final int MAX_BATCH_SIZE = 10000;
+ private final MetricInfo dummy = MetricUtils.mkMetricInfo();
+
+ public static final String FIELD_METRIC_WORKER = "worker";
+ public static final String FIELD_METRIC_METRICS = "metrics";
+ public static final String FILED_HEARBEAT_EVENT = "hbEvent";
+ public static final String FILED_CTRL_EVENT = "ctrlEvent";
+
+ private Map conf;
+ private StormClusterState zkCluster;
+ private OutputCollector collector;
+
+ private int taskId;
+ private String topologyId;
+ private volatile Set<ResourceWorkerSlot> workerSet;
+ private IntervalCheck intervalCheck;
+
+ private TaskHeartbeatUpdater taskHeartbeatUpdater;
+
+ private BackpressureCoordinator backpressureCoordinator;
+
+ private TopologyMetricContext topologyMetricContext;
+
+ private ScheduledExecutorService uploadMetricsExecutor;
+
+ private Thread updateThread;
+ private BlockingQueue<Tuple> queue = new LinkedBlockingDeque<Tuple>();
+ private IntervalCheck threadAliveCheck;
+
+ private volatile boolean isActive = true;
+
+ private class TopologyMasterRunnable implements Runnable {
+ @Override
+ public void run() {
+ while (isActive) {
+ try {
+ Tuple event = queue.take();
+ if (event != null) {
+ eventHandle(event);
+ }
+ } catch (Throwable e) {
+ LOG.error("Failed to process event", e);
+ }
+ }
+ }
+
+ }
+ @Override
+ public void prepare(Map stormConf, TopologyContext context,
+ OutputCollector collector) {
+ this.conf = context.getStormConf();
+ this.collector = collector;
+ this.taskId = context.getThisTaskId();
+ this.topologyId = context.getTopologyId();
+ this.zkCluster = context.getZkCluster();
+
+ try {
+ Assignment assignment = zkCluster.assignment_info(topologyId, null);
+ this.workerSet = assignment.getWorkers();
+ intervalCheck = new IntervalCheck();
+ intervalCheck.setInterval(10);
+ intervalCheck.start();
+ } catch (Exception e) {
+ LOG.error("Failed to get assignment for " + topologyId);
+ }
+
+ this.taskHeartbeatUpdater = new TaskHeartbeatUpdater(this.conf, topologyId, taskId, zkCluster);
+
+ this.backpressureCoordinator = new BackpressureCoordinator(collector, context, taskId);
+
+ this.topologyMetricContext = new TopologyMetricContext(topologyId, this.workerSet, this.conf);
+
+ this.uploadMetricsExecutor = Executors.newSingleThreadScheduledExecutor();
+ this.uploadMetricsExecutor.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ int secOffset = TimeUtils.secOffset();
+ int offset = 35;
+ if (secOffset < offset) {
+ JStormUtils.sleepMs((offset - secOffset) * 1000);
+ } else if (secOffset == offset) {
+ // do nothing
+ } else {
+ JStormUtils.sleepMs((60 - secOffset + offset) * 1000);
+ }
+ if (topologyMetricContext.getUploadedWorkerNum() > 0) {
+ metricLogger.info("force upload metrics.");
+ mergeAndUpload();
+ }
+ }
+ }, 5, 60, TimeUnit.SECONDS);
+
+ updateThread = new Thread(new TopologyMasterRunnable());
+ updateThread.start();
+
+ threadAliveCheck = new IntervalCheck();
+ threadAliveCheck.setInterval(30);
+ threadAliveCheck.start();
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ if (input != null) {
+
+ try {
+ queue.put(input);
+ } catch (InterruptedException e) {
+ LOG.error("Failed to put event to taskHb updater's queue", e);
+ }
+
+ if (threadAliveCheck.check()) {
+ if (updateThread == null || updateThread.isAlive() == false) {
+ updateThread = new Thread(new TopologyMasterRunnable());
+ updateThread.start();
+ }
+ }
+
+ collector.ack(input);
+ } else {
+ LOG.error("Received null tuple!");
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ isActive = false;
+ LOG.info("Successfully cleanup");
+ }
+
+ private void updateTopologyWorkerSet() {
+ if (intervalCheck.check()) {
+ Assignment assignment;
+ try {
+ assignment = zkCluster.assignment_info(topologyId, null);
+ this.workerSet = assignment.getWorkers();
+ } catch (Exception e) {
+ LOG.error("Failed to get assignment for " + topologyId);
+ }
+
+ }
+ }
+
+ private void eventHandle(Tuple input) {
+ updateTopologyWorkerSet();
+
+ String stream = input.getSourceStreamId();
+
+ try {
+ if (stream.equals(Common.TOPOLOGY_MASTER_HB_STREAM_ID)) {
+ taskHeartbeatUpdater.process(input);
+ } else if (stream.equals(Common.TOPOLOGY_MASTER_METRICS_STREAM_ID)) {
+ updateMetrics(input);
+ } else if (stream.equals(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID)) {
+ backpressureCoordinator.process(input);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to handle event: " + input.toString(), e);
+ }
+ }
+
+ @Override
+ public void update(Map conf) {
+ LOG.info("Topology master received new conf:" + conf);
+
+ if (backpressureCoordinator.isBackpressureConfigChange(conf)) {
+ backpressureCoordinator.updateBackpressureConfig(conf);
+ }
+ }
+
+ private void updateMetrics(Tuple input) {
+ String workerSlot = (String) input.getValueByField(FIELD_METRIC_WORKER);
+ WorkerUploadMetrics metrics = (WorkerUploadMetrics) input.getValueByField(FIELD_METRIC_METRICS);
+ topologyMetricContext.addToMemCache(workerSlot, metrics.get_allMetrics());
+ metricLogger.info("received metrics from:{}, size:{}", workerSlot, metrics.get_allMetrics().get_metrics_size());
+
+ if (topologyMetricContext.readyToUpload()) {
+ metricLogger.info("all {} worker slots have updated metrics, start merging & uploading...",
+ topologyMetricContext.getWorkerNum());
+ uploadMetricsExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ mergeAndUpload();
+ }
+ });
+ }
+ }
+
+ private void mergeAndUpload() {
+ // double check
+ if (topologyMetricContext.getUploadedWorkerNum() > 0) {
+ TopologyMetric tpMetric = topologyMetricContext.mergeMetrics();
+ if (tpMetric != null) {
+ uploadMetrics(tpMetric);
+ }
+
+ topologyMetricContext.resetUploadedMetrics();
+ //MetricUtils.logMetrics(tpMetric.get_componentMetric());
+ }
+ }
+
+ /**
+ * upload metrics sequentially due to thrift frame size limit (15MB)
+ */
+ private void uploadMetrics(TopologyMetric tpMetric) {
+ long start = System.currentTimeMillis();
+ if (StormConfig.local_mode(conf)) {
+ return;
+ } else {
+ NimbusClient client = null;
+ try {
+ client = NimbusClient.getConfiguredClient(conf);
+ Nimbus.Client client1 = client.getClient();
+
+ MetricInfo topologyMetrics = tpMetric.get_topologyMetric();
+ MetricInfo componentMetrics = tpMetric.get_componentMetric();
+ MetricInfo taskMetrics = tpMetric.get_taskMetric();
+ MetricInfo streamMetrics = tpMetric.get_streamMetric();
+ MetricInfo workerMetrics = tpMetric.get_workerMetric();
+ MetricInfo nettyMetrics = tpMetric.get_nettyMetric();
+
+ int totalSize = topologyMetrics.get_metrics_size() + componentMetrics.get_metrics_size() +
+ taskMetrics.get_metrics_size() + streamMetrics.get_metrics_size() +
+ workerMetrics.get_metrics_size() + nettyMetrics.get_metrics_size();
+
+ // for small topologies, send all metrics together to ease the pressure of nimbus
+ if (totalSize < MAX_BATCH_SIZE) {
+ client1.uploadTopologyMetrics(topologyId,
+ new TopologyMetric(topologyMetrics, componentMetrics, workerMetrics, taskMetrics,
+ streamMetrics, nettyMetrics));
+ } else {
+ client1.uploadTopologyMetrics(topologyId,
+ new TopologyMetric(topologyMetrics, componentMetrics, dummy, dummy, dummy, dummy));
+ batchUploadMetrics(client1, topologyId, workerMetrics, MetaType.WORKER);
+ batchUploadMetrics(client1, topologyId, taskMetrics, MetaType.TASK);
+ batchUploadMetrics(client1, topologyId, streamMetrics, MetaType.STREAM);
+ batchUploadMetrics(client1, topologyId, nettyMetrics, MetaType.NETTY);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to upload worker metrics", e);
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
+ metricLogger.info("upload metrics, cost:{}", System.currentTimeMillis() - start);
+ }
+
+ private void batchUploadMetrics(Nimbus.Client client, String topologyId, MetricInfo metricInfo, MetaType metaType) {
+ if (metricInfo.get_metrics_size() > MAX_BATCH_SIZE) {
+ Map<String, Map<Integer, MetricSnapshot>> data = metricInfo.get_metrics();
+
+ Map<String, Map<Integer, MetricSnapshot>> part = Maps.newHashMapWithExpectedSize(MAX_BATCH_SIZE);
+ MetricInfo uploadPart = new MetricInfo();
+ int i = 0;
+ for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : data.entrySet()) {
+ part.put(entry.getKey(), entry.getValue());
+ if (++i >= MAX_BATCH_SIZE) {
+ uploadPart.set_metrics(part);
+ doUpload(client, topologyId, uploadPart, metaType);
+
+ i = 0;
+ part.clear();
+ }
+ }
+ if (part.size() > 0) {
+ uploadPart.set_metrics(part);
+ doUpload(client, topologyId, uploadPart, metaType);
+ }
+ } else {
+ doUpload(client, topologyId, metricInfo, metaType);
+ }
+ }
+
+ private void doUpload(Nimbus.Client client, String topologyId, MetricInfo part, MetaType metaType) {
+ try {
+ if (metaType == MetaType.TASK) {
+ client.uploadTopologyMetrics(topologyId,
+ new TopologyMetric(dummy, dummy, dummy, part, dummy, dummy));
+ } else if (metaType == MetaType.STREAM) {
+ client.uploadTopologyMetrics(topologyId,
+ new TopologyMetric(dummy, dummy, dummy, dummy, part, dummy));
+ } else if (metaType == MetaType.WORKER) {
+ client.uploadTopologyMetrics(topologyId,
+ new TopologyMetric(dummy, dummy, part, dummy, dummy, dummy));
+ } else if (metaType == MetaType.NETTY) {
+ client.uploadTopologyMetrics(topologyId,
+ new TopologyMetric(dummy, dummy, dummy, dummy, dummy, part));
+ }
+ } catch (Exception ex) {
+ LOG.error("Error", ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorQueue.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorQueue.java
index 4a4a72b..8f0138a 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorQueue.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorQueue.java
@@ -1,4 +1,4 @@
-/**
+package com.alibaba.jstorm.utils; /**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorRunable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorRunable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorRunable.java
index 17b7885..161156b 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorRunable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/DisruptorRunable.java
@@ -17,67 +17,59 @@
*/
package com.alibaba.jstorm.utils;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.utils.DisruptorQueue;
-
import com.alibaba.jstorm.callback.AsyncLoopRunnable;
import com.alibaba.jstorm.callback.RunnableCallback;
-import com.alibaba.jstorm.common.metric.QueueGauge;
-import com.alibaba.jstorm.common.metric.Timer;
-import com.alibaba.jstorm.metric.JStormHealthCheck;
+import com.alibaba.jstorm.common.metric.*;
import com.alibaba.jstorm.metric.JStormMetrics;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.metric.MetricUtils;
+import com.alibaba.jstorm.metric.JStormHealthCheck;
import com.alibaba.jstorm.metric.MetricDef;
import com.lmax.disruptor.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
//import com.alibaba.jstorm.message.zeroMq.ISendConnection;
/**
- *
* Disruptor Consumer thread
*
* @author yannian
- *
*/
-public abstract class DisruptorRunable extends RunnableCallback implements
- EventHandler {
- private final static Logger LOG = LoggerFactory
- .getLogger(DisruptorRunable.class);
+public abstract class DisruptorRunable extends RunnableCallback implements EventHandler {
+ private final static Logger LOG = LoggerFactory.getLogger(DisruptorRunable.class);
protected DisruptorQueue queue;
protected String idStr;
- protected Timer timer;
+ protected AsmHistogram timer;
protected AtomicBoolean shutdown = AsyncLoopRunnable.getShutdown();
public DisruptorRunable(DisruptorQueue queue, String idStr) {
this.queue = queue;
- this.timer =
- JStormMetrics.registerWorkerTimer(idStr + MetricDef.TIME_TYPE);
this.idStr = idStr;
- QueueGauge queueGauge =
- new QueueGauge(idStr + MetricDef.QUEUE_TYPE, queue);
- JStormMetrics.registerWorkerGauge(queueGauge, idStr
- + MetricDef.QUEUE_TYPE);
+ this.timer =
+ (AsmHistogram) JStormMetrics.registerWorkerMetric(MetricUtils.workerMetricName(idStr + MetricDef.TIME_TYPE, MetricType.HISTOGRAM),
+ new AsmHistogram());
+
+ QueueGauge queueGauge = new QueueGauge(queue, idStr, MetricDef.QUEUE_TYPE);
+ JStormMetrics.registerWorkerMetric(MetricUtils.workerMetricName(idStr + MetricDef.QUEUE_TYPE, MetricType.GAUGE), new AsmGauge(queueGauge));
JStormHealthCheck.registerWorkerHealthCheck(idStr, queueGauge);
}
- public abstract void handleEvent(Object event, boolean endOfBatch)
- throws Exception;
+ public abstract void handleEvent(Object event, boolean endOfBatch) throws Exception;
/**
* This function need to be implements
*
- * @see com.lmax.disruptor.EventHandler#onEvent(java.lang.Object, long,
- * boolean)
+ * @see EventHandler#onEvent(Object, long, boolean)
*/
@Override
- public void onEvent(Object event, long sequence, boolean endOfBatch)
- throws Exception {
+ public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
if (event == null) {
return;
}
@@ -87,7 +79,7 @@ public abstract class DisruptorRunable extends RunnableCallback implements
handleEvent(event, endOfBatch);
} finally {
long end = System.nanoTime();
- timer.update((end - start)/1000000.0d);
+ timer.update((end - start) / TimeUtils.NS_PER_US);
}
}
@@ -96,17 +88,15 @@ public abstract class DisruptorRunable extends RunnableCallback implements
LOG.info("Successfully start thread " + idStr);
queue.consumerStarted();
- while (shutdown.get() == false) {
+ while (!shutdown.get()) {
queue.consumeBatchWhenAvailable(this);
-
}
-
LOG.info("Successfully exit thread " + idStr);
}
@Override
public void shutdown() {
- JStormMetrics.unregisterWorkerMetric(idStr + MetricDef.QUEUE_TYPE);
+ JStormMetrics.unregisterWorkerMetric(MetricUtils.workerMetricName(idStr + MetricDef.QUEUE_TYPE, MetricType.GAUGE));
JStormHealthCheck.unregisterWorkerHealthCheck(idStr);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/EPlatform.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/EPlatform.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/EPlatform.java
index 8c62d6f..2773963 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/EPlatform.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/EPlatform.java
@@ -17,34 +17,18 @@
*/
package com.alibaba.jstorm.utils;
-public enum EPlatform {
- Any("any"),
- Linux("Linux"),
- Mac_OS("Mac OS"),
- Mac_OS_X("Mac OS X"),
- Windows("Windows"),
- OS2("OS/2"),
- Solaris("Solaris"),
- SunOS("SunOS"),
- MPEiX("MPE/iX"),
- HP_UX("HP-UX"),
- AIX("AIX"),
- OS390("OS/390"),
- FreeBSD("FreeBSD"),
- Irix("Irix"),
- Digital_Unix("Digital Unix"),
- NetWare_411("NetWare"),
- OSF1("OSF1"),
- OpenVMS("OpenVMS"),
- Others("Others");
-
- private EPlatform(String desc){
- this.description = desc;
- }
-
- public String toString(){
- return description;
- }
-
- private String description;
-}
+public enum EPlatform {
+ Any("any"), Linux("Linux"), Mac_OS("Mac OS"), Mac_OS_X("Mac OS X"), Windows("Windows"), OS2("OS/2"), Solaris("Solaris"), SunOS("SunOS"), MPEiX("MPE/iX"), HP_UX(
+ "HP-UX"), AIX("AIX"), OS390("OS/390"), FreeBSD("FreeBSD"), Irix("Irix"), Digital_Unix("Digital Unix"), NetWare_411("NetWare"), OSF1("OSF1"), OpenVMS(
+ "OpenVMS"), Others("Others");
+
+ private EPlatform(String desc) {
+ this.description = desc;
+ }
+
+ public String toString() {
+ return description;
+ }
+
+ private String description;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/FileAttribute.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/FileAttribute.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/FileAttribute.java
index e33167a..7099169 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/FileAttribute.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/FileAttribute.java
@@ -77,8 +77,7 @@ public class FileAttribute implements Serializable, JSONAware {
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
@Override
@@ -122,8 +121,7 @@ public class FileAttribute implements Serializable, JSONAware {
String jsonString = JStormUtils.to_json(map);
- Map<String, Map> map2 =
- (Map<String, Map>) JStormUtils.from_json(jsonString);
+ Map<String, Map> map2 = (Map<String, Map>) JStormUtils.from_json(jsonString);
Map jObject = map2.get("test");
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java
index 20c1f7a..378ee26 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java
@@ -29,8 +29,7 @@ public class HttpserverUtils {
public static final String HTTPSERVER_LOGVIEW_PARAM_CMD_JSTACK = "jstack";
- public static final String HTTPSERVER_LOGVIEW_PARAM_CMD_SHOW_CONF =
- "showConf";
+ public static final String HTTPSERVER_LOGVIEW_PARAM_CMD_SHOW_CONF = "showConf";
public static final String HTTPSERVER_LOGVIEW_PARAM_LOGFILE = "log";
@@ -38,8 +37,7 @@ public class HttpserverUtils {
public static final String HTTPSERVER_LOGVIEW_PARAM_DIR = "dir";
- public static final String HTTPSERVER_LOGVIEW_PARAM_WORKER_PORT =
- "workerPort";
+ public static final String HTTPSERVER_LOGVIEW_PARAM_WORKER_PORT = "workerPort";
public static final String HTTPSERVER_LOGVIEW_PARAM_SIZE_FORMAT = "%016d\n";
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/IntervalCheck.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/IntervalCheck.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/IntervalCheck.java
index 992659c..de7b504 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/IntervalCheck.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/IntervalCheck.java
@@ -1,83 +1,74 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.utils;
-
-import java.io.Serializable;
-
-public class IntervalCheck implements Serializable {
-
- /**
- *
- */
- private static final long serialVersionUID = 8952971673547362883L;
-
- long lastCheck = System.currentTimeMillis();
-
- // default interval is 1 second
- long interval = 1000;
-
- /*
- * if last check time is before interval seconds, return true, otherwise
- * return false
- */
- public boolean check() {
- return checkAndGet() != null;
- }
-
- /**
- *
- * @return
- */
- public Double checkAndGet() {
- long now = System.currentTimeMillis();
-
- synchronized (this) {
- if (now >= interval + lastCheck) {
- double pastSecond = ((double) (now - lastCheck)) / 1000;
- lastCheck = now;
- return pastSecond;
- }
- }
-
- return null;
- }
-
- public long getInterval() {
- return interval/1000;
- }
-
- public long getIntervalMs() {
- return interval;
- }
-
- public void setInterval(long interval) {
- this.interval = interval * 1000;
- }
-
- public void setIntervalMs(long interval) {
- this.interval = interval;
- }
-
- public void adjust(long addTimeMillis) {
- lastCheck += addTimeMillis;
- }
-
- public void start() {
- lastCheck = System.currentTimeMillis();
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.utils;
+
+import java.io.Serializable;
+
+public class IntervalCheck implements Serializable {
+ private static final long serialVersionUID = 8952971673547362883L;
+
+ long lastCheck = System.currentTimeMillis();
+
+ // default interval is 1 second
+ long interval = 1000;
+
+ /*
+ * if last check time is before interval seconds, return true, otherwise return false
+ */
+ public boolean check() {
+ return checkAndGet() != null;
+ }
+
+ public Double checkAndGet() {
+ long now = System.currentTimeMillis();
+
+ synchronized (this) {
+ if (now >= interval + lastCheck) {
+ double pastSecond = ((double) (now - lastCheck)) / 1000;
+ lastCheck = now;
+ return pastSecond;
+ }
+ }
+
+ return null;
+ }
+
+ public long getInterval() {
+ return interval / 1000;
+ }
+
+ public long getIntervalMs() {
+ return interval;
+ }
+
+ public void setInterval(long interval) {
+ this.interval = interval * 1000;
+ }
+
+ public void setIntervalMs(long interval) {
+ this.interval = interval;
+ }
+
+ public void adjust(long addTimeMillis) {
+ lastCheck += addTimeMillis;
+ }
+
+ public void start() {
+ lastCheck = System.currentTimeMillis();
+ }
+}