You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:49 UTC
[11/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerData.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerData.java
index da69070..0cc16e4 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerData.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerData.java
@@ -1,3 +1,4 @@
+
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -17,27 +18,6 @@
*/
package com.alibaba.jstorm.daemon.worker;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URL;
-import java.security.InvalidParameterException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import backtype.storm.Config;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
@@ -48,28 +28,38 @@ import backtype.storm.scheduler.WorkerSlot;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import backtype.storm.utils.WorkerClassLoader;
-
import com.alibaba.jstorm.callback.AsyncLoopDefaultKill;
+import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.cluster.Cluster;
-import com.alibaba.jstorm.cluster.ClusterState;
-import com.alibaba.jstorm.cluster.Common;
-import com.alibaba.jstorm.cluster.StormClusterState;
-import com.alibaba.jstorm.cluster.StormConfig;
-import com.alibaba.jstorm.common.metric.window.Metric;
+import com.alibaba.jstorm.cluster.*;
+import com.alibaba.jstorm.common.metric.AsmGauge;
+import com.alibaba.jstorm.common.metric.AsmMetric;
import com.alibaba.jstorm.daemon.nimbus.StatusType;
import com.alibaba.jstorm.daemon.worker.timer.TimerTrigger;
import com.alibaba.jstorm.message.netty.ControlMessage;
+import com.alibaba.jstorm.metric.*;
import com.alibaba.jstorm.schedule.Assignment;
-import com.alibaba.jstorm.schedule.Assignment.AssignmentType;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.task.TaskShutdownDameon;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.zk.ZkTool;
+import com.codahale.metrics.Gauge;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.security.InvalidParameterException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.alibaba.jstorm.schedule.Assignment.AssignmentType;
public class WorkerData {
@@ -112,6 +102,8 @@ public class WorkerData {
private volatile Set<Integer> outboundTasks;
private Set<Integer> localTasks;
+ private Set<Integer> localNodeTasks;
+
private ConcurrentHashMap<Integer, DisruptorQueue> innerTaskTransfer;
private ConcurrentHashMap<Integer, DisruptorQueue> deserializeQueues;
@@ -152,15 +144,18 @@ public class WorkerData {
private ScheduledExecutorService threadPool;
private volatile Long assignmentTS; // Assignment timeStamp. The time of
- // last update of assignment
+ // last update of assignment
+
private volatile AssignmentType assignmentType;
-
+
private IConnection recvConnection;
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public WorkerData(Map conf, IContext context, String topology_id,
- String supervisor_id, int port, String worker_id, String jar_path)
- throws Exception {
+ private JStormMetricsReporter metricReporter;
+
+ private AsyncLoopThread healthReporterThread;
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public WorkerData(Map conf, IContext context, String topology_id, String supervisor_id, int port, String worker_id, String jar_path) throws Exception {
this.conf = conf;
this.context = context;
@@ -170,7 +165,7 @@ public class WorkerData {
this.workerId = worker_id;
this.shutdown = new AtomicBoolean(false);
-
+
this.monitorEnable = new AtomicBoolean(true);
this.topologyStatus = StatusType.active;
@@ -183,29 +178,54 @@ public class WorkerData {
this.zkClusterstate = ZkTool.mk_distributed_cluster_state(conf);
this.zkCluster = Cluster.mk_storm_cluster_state(zkClusterstate);
- Map rawConf =
- StormConfig.read_supervisor_topology_conf(conf, topology_id);
+ Map rawConf = StormConfig.read_supervisor_topology_conf(conf, topology_id);
this.stormConf = new HashMap<Object, Object>();
this.stormConf.putAll(conf);
this.stormConf.putAll(rawConf);
-
+
+ JStormMetrics.setTopologyId(topology_id);
+ JStormMetrics.setPort(port);
+ JStormMetrics.setDebug(ConfigExtension.isEnableMetricDebug(stormConf));
+ JStormMetrics.setEnabled(ConfigExtension.isEnableMetrics(stormConf));
+ JStormMetrics.addDebugMetrics(ConfigExtension.getDebugMetricNames(stormConf));
+ AsmMetric.setSampleRate(ConfigExtension.getMetricSampleRate(stormConf));
+
ConfigExtension.setLocalSupervisorId(stormConf, supervisorId);
ConfigExtension.setLocalWorkerId(stormConf, workerId);
ConfigExtension.setLocalWorkerPort(stormConf, port);
ControlMessage.setPort(port);
- Metric.setEnable(ConfigExtension.isEnablePerformanceMetrics(stormConf));
+
+ JStormMetrics.registerWorkerTopologyMetric(
+ JStormMetrics.workerMetricName(MetricDef.CPU_USED_RATIO, MetricType.GAUGE),
+ new AsmGauge(new Gauge<Double>() {
+ @Override
+ public Double getValue() {
+ return JStormUtils.getCpuUsage();
+ }
+ }));
+
+ JStormMetrics.registerWorkerTopologyMetric(JStormMetrics.workerMetricName(MetricDef.MEMORY_USED, MetricType.GAUGE),
+ new AsmGauge(new Gauge<Double>() {
+ @Override
+ public Double getValue() {
+ return JStormUtils.getMemUsage();
+ }
+ }));
+
+ JStormMetrics.registerWorkerMetric(JStormMetrics.workerMetricName(MetricDef.DISK_USAGE, MetricType.GAUGE), new AsmGauge(new Gauge<Double>() {
+ @Override
+ public Double getValue() {
+ return JStormUtils.getDiskUsage();
+ }
+ }));
LOG.info("Worker Configuration " + stormConf);
try {
+ boolean enableClassloader = ConfigExtension.isEnableTopologyClassLoader(stormConf);
+ boolean enableDebugClassloader = ConfigExtension.isEnableClassloaderDebug(stormConf);
- boolean enableClassloader =
- ConfigExtension.isEnableTopologyClassLoader(stormConf);
- boolean enableDebugClassloader =
- ConfigExtension.isEnableClassloaderDebug(stormConf);
-
- if (jar_path == null && enableClassloader == true
- && !conf.get(Config.STORM_CLUSTER_MODE).equals("local")) {
+ if (jar_path == null && enableClassloader == true && !conf.get(Config.STORM_CLUSTER_MODE).equals("local")) {
LOG.error("enable classloader, but not app jar");
throw new InvalidParameterException();
}
@@ -221,14 +241,11 @@ public class WorkerData {
urls.add(url);
}
urlArray = urls.toArray(new URL[0]);
-
}
- WorkerClassLoader.mkInstance(urlArray, ClassLoader
- .getSystemClassLoader(), ClassLoader.getSystemClassLoader()
- .getParent(), enableClassloader, enableDebugClassloader);
+ WorkerClassLoader.mkInstance(urlArray, ClassLoader.getSystemClassLoader(), ClassLoader.getSystemClassLoader().getParent(), enableClassloader,
+ enableDebugClassloader);
} catch (Exception e) {
- // TODO Auto-generated catch block
LOG.error("init jarClassLoader error!", e);
throw new InvalidParameterException();
}
@@ -237,39 +254,27 @@ public class WorkerData {
this.context = TransportFactory.makeContext(stormConf);
}
- boolean disruptorUseSleep =
- ConfigExtension.isDisruptorUseSleep(stormConf);
+ boolean disruptorUseSleep = ConfigExtension.isDisruptorUseSleep(stormConf);
DisruptorQueue.setUseSleep(disruptorUseSleep);
- boolean isLimited =
- ConfigExtension.getTopologyBufferSizeLimited(stormConf);
+ boolean isLimited = ConfigExtension.getTopologyBufferSizeLimited(stormConf);
DisruptorQueue.setLimited(isLimited);
- LOG.info("Disruptor use sleep:" + disruptorUseSleep + ", limited size:"
- + isLimited);
+ LOG.info("Disruptor use sleep:" + disruptorUseSleep + ", limited size:" + isLimited);
// this.transferQueue = new LinkedBlockingQueue<TransferData>();
- int buffer_size =
- Utils.getInt(conf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE));
- WaitStrategy waitStrategy =
- (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(conf);
- this.transferQueue =
- DisruptorQueue.mkInstance("TotalTransfer", ProducerType.MULTI,
- buffer_size, waitStrategy);
+ int buffer_size = Utils.getInt(stormConf.get(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE));
+ WaitStrategy waitStrategy = (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(stormConf);
+ this.transferQueue = DisruptorQueue.mkInstance("TotalTransfer", ProducerType.MULTI, buffer_size, waitStrategy);
this.transferQueue.consumerStarted();
- this.sendingQueue =
- DisruptorQueue.mkInstance("TotalSending", ProducerType.MULTI,
- buffer_size, waitStrategy);
+ this.sendingQueue = DisruptorQueue.mkInstance("TotalSending", ProducerType.MULTI, buffer_size, waitStrategy);
this.sendingQueue.consumerStarted();
this.nodeportSocket = new ConcurrentHashMap<WorkerSlot, IConnection>();
this.taskNodeport = new ConcurrentHashMap<Integer, WorkerSlot>();
this.workerToResource = new ConcurrentSkipListSet<ResourceWorkerSlot>();
- this.innerTaskTransfer =
- new ConcurrentHashMap<Integer, DisruptorQueue>();
- this.deserializeQueues =
- new ConcurrentHashMap<Integer, DisruptorQueue>();
+ this.innerTaskTransfer = new ConcurrentHashMap<Integer, DisruptorQueue>();
+ this.deserializeQueues = new ConcurrentHashMap<Integer, DisruptorQueue>();
this.tasksToComponent = new ConcurrentHashMap<Integer, String>();
- this.componentToSortedTasks =
- new ConcurrentHashMap<String, List<Integer>>();
+ this.componentToSortedTasks = new ConcurrentHashMap<String, List<Integer>>();
Assignment assignment = zkCluster.assignment_info(topologyId, null);
if (assignment == null) {
@@ -288,8 +293,7 @@ public class WorkerData {
LOG.info("Current worker taskList:" + taskids);
// deserialize topology code from local dir
- rawTopology =
- StormConfig.read_supervisor_topology_code(conf, topology_id);
+ rawTopology = StormConfig.read_supervisor_topology_code(conf, topology_id);
sysTopology = Common.system_topology(stormConf, rawTopology);
generateMaps();
@@ -301,15 +305,17 @@ public class WorkerData {
threadPool = Executors.newScheduledThreadPool(THREAD_POOL_NUM);
TimerTrigger.setScheduledExecutorService(threadPool);
+ if (!StormConfig.local_mode(stormConf)) {
+ healthReporterThread = new AsyncLoopThread(new JStormHealthReporter(this));
+ }
+
try {
- Long tmp =
- StormConfig.read_supervisor_topology_timestamp(conf,
- topology_id);
+ Long tmp = StormConfig.read_supervisor_topology_timestamp(conf, topology_id);
assignmentTS = (tmp == null ? System.currentTimeMillis() : tmp);
} catch (FileNotFoundException e) {
assignmentTS = System.currentTimeMillis();
}
-
+
outboundTasks = new HashSet<Integer>();
LOG.info("Successfully create WorkerData");
@@ -317,13 +323,10 @@ public class WorkerData {
}
/**
- * private ConcurrentHashMap<Integer, WorkerSlot> taskNodeport; private
- * HashMap<Integer, String> tasksToComponent; private Map<String,
- * List<Integer>> componentToSortedTasks; private Map<String, Map<String,
- * Fields>> componentToStreamToFields; private Map<String, Object>
- * defaultResources; private Map<String, Object> userResources; private
- * Map<String, Object> executorData; private Map registeredMetrics;
- *
+ * private ConcurrentHashMap<Integer, WorkerSlot> taskNodeport; private HashMap<Integer, String> tasksToComponent; private Map<String, List<Integer>>
+ * componentToSortedTasks; private Map<String, Map<String, Fields>> componentToStreamToFields; private Map<String, Object> defaultResources; private
+ * Map<String, Object> userResources; private Map<String, Object> executorData; private Map registeredMetrics;
+ *
* @throws Exception
*/
private void generateMaps() throws Exception {
@@ -335,7 +338,7 @@ public class WorkerData {
this.registeredMetrics = new HashMap();
}
- public Map<Object, Object> getConf() {
+ public Map<Object, Object> getRawConf() {
return conf;
}
@@ -351,6 +354,10 @@ public class WorkerData {
this.topologyStatus = topologyStatus;
}
+ public Map<Object, Object> getConf() {
+ return stormConf;
+ }
+
public Map<Object, Object> getStormConf() {
return stormConf;
}
@@ -396,7 +403,19 @@ public class WorkerData {
}
public ConcurrentSkipListSet<ResourceWorkerSlot> getWorkerToResource() {
- return workerToResource;
+ synchronized (workerToResource) {
+ return workerToResource;
+ }
+ }
+
+ public void updateWorkerToResource(Set<ResourceWorkerSlot> workers) {
+ synchronized (workerToResource) {
+ Set<ResourceWorkerSlot> oldWorkers = workerToResource.clone();
+ oldWorkers.removeAll(workers);
+ if (oldWorkers.size() > 0)
+ workerToResource.removeAll(workers);
+ workerToResource.addAll(workers);
+ }
}
public ConcurrentHashMap<Integer, DisruptorQueue> getInnerTaskTransfer() {
@@ -471,8 +490,7 @@ public class WorkerData {
this.shutdownTasks.add(shutdownTask);
}
- public List<TaskShutdownDameon> getShutdownDaemonbyTaskIds(
- Set<Integer> taskIds) {
+ public List<TaskShutdownDameon> getShutdownDaemonbyTaskIds(Set<Integer> taskIds) {
List<TaskShutdownDameon> ret = new ArrayList<TaskShutdownDameon>();
for (TaskShutdownDameon shutdown : shutdownTasks) {
if (taskIds.contains(shutdown.getTaskId()))
@@ -494,7 +512,7 @@ public class WorkerData {
outTaskStatus.put(taskId, false);
}
}
-
+
public Map<Integer, Boolean> getOutboundTaskStatus() {
return outTaskStatus;
}
@@ -502,7 +520,7 @@ public class WorkerData {
public void addOutboundTaskStatusIfAbsent(Integer taskId) {
outTaskStatus.putIfAbsent(taskId, false);
}
-
+
public void removeOutboundTaskStatus(Integer taskId) {
outTaskStatus.remove(taskId);
}
@@ -512,8 +530,7 @@ public class WorkerData {
}
public boolean isOutboundTaskActive(Integer taskId) {
- return outTaskStatus.get(taskId) != null ? outTaskStatus.get(taskId)
- : false;
+ return outTaskStatus.get(taskId) != null ? outTaskStatus.get(taskId) : false;
}
public ScheduledExecutorService getThreadPool() {
@@ -527,11 +544,11 @@ public class WorkerData {
public Long getAssignmentTs() {
return assignmentTS;
}
-
+
public void setAssignmentType(AssignmentType type) {
this.assignmentType = type;
}
-
+
public AssignmentType getAssignmentType() {
return assignmentType;
}
@@ -544,28 +561,33 @@ public class WorkerData {
public void updateTaskIds(Assignment assignment) {
this.taskids.clear();
- this.taskids.addAll(assignment
- .getCurrentWorkerTasks(supervisorId, port));
+ this.taskids.addAll(assignment.getCurrentWorkerTasks(supervisorId, port));
+ }
+
+ public Set<Integer> getLocalNodeTasks() {
+ return localNodeTasks;
+ }
+
+ public void setLocalNodeTasks(Set<Integer> localNodeTasks) {
+ this.localNodeTasks = localNodeTasks;
}
public void setOutboundTasks(Set<Integer> outboundTasks) {
this.outboundTasks = outboundTasks;
}
-
+
public Set<Integer> getOutboundTasks() {
return outboundTasks;
}
private void updateTaskComponentMap() throws Exception {
- Map<Integer, String> tmp = Common.getTaskToComponent(
- Cluster.get_all_taskInfo(zkCluster, topologyId));
+ Map<Integer, String> tmp = Common.getTaskToComponent(Cluster.get_all_taskInfo(zkCluster, topologyId));
this.tasksToComponent.putAll(tmp);
LOG.info("Updated tasksToComponentMap:" + tasksToComponent);
this.componentToSortedTasks.putAll(JStormUtils.reverse_map(tmp));
- for (java.util.Map.Entry<String, List<Integer>> entry : componentToSortedTasks
- .entrySet()) {
+ for (Map.Entry<String, List<Integer>> entry : componentToSortedTasks.entrySet()) {
List<Integer> tasks = entry.getValue();
Collections.sort(tasks);
@@ -573,16 +595,13 @@ public class WorkerData {
}
private void updateStormTopology() {
- StormTopology rawTmp = null;
- StormTopology sysTmp = null;
-
+ StormTopology rawTmp;
+ StormTopology sysTmp;
try {
- rawTmp =
- StormConfig.read_supervisor_topology_code(conf, topologyId);
+ rawTmp = StormConfig.read_supervisor_topology_code(conf, topologyId);
sysTmp = Common.system_topology(stormConf, rawTopology);
} catch (IOException e) {
- LOG.error("Failed to read supervisor topology code for "
- + topologyId, e);
+ LOG.error("Failed to read supervisor topology code for " + topologyId, e);
return;
} catch (InvalidTopologyException e) {
LOG.error("Failed to update sysTopology for " + topologyId, e);
@@ -593,8 +612,7 @@ public class WorkerData {
updateTopology(sysTopology, sysTmp);
}
- private void updateTopology(StormTopology oldTopology,
- StormTopology newTopology) {
+ private void updateTopology(StormTopology oldTopology, StormTopology newTopology) {
oldTopology.set_bolts(newTopology.get_bolts());
oldTopology.set_spouts(newTopology.get_spouts());
oldTopology.set_state_spouts(newTopology.get_state_spouts());
@@ -604,12 +622,19 @@ public class WorkerData {
return monitorEnable;
}
- public IConnection getRecvConnection() {
- return recvConnection;
- }
+ public IConnection getRecvConnection() {
+ return recvConnection;
+ }
- public void setRecvConnection(IConnection recvConnection) {
- this.recvConnection = recvConnection;
- }
+ public void setRecvConnection(IConnection recvConnection) {
+ this.recvConnection = recvConnection;
+ }
+ public JStormMetricsReporter getMetricsReporter() {
+ return metricReporter;
+ }
+
+ public void setMetricsReporter(JStormMetricsReporter metricReporter) {
+ this.metricReporter = metricReporter;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerHeartbeat.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerHeartbeat.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerHeartbeat.java
index d8ec622..9d1cca7 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerHeartbeat.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerHeartbeat.java
@@ -25,8 +25,7 @@ import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
/**
- * Worker's Heartbeat data woker will update the object to
- * /LOCAL-DIR/workers/${woker-id}/heartbeats
+ * Worker's Heartbeat data woker will update the object to /LOCAL-DIR/workers/${woker-id}/heartbeats
*
* @author yannian/Longda
*
@@ -39,8 +38,7 @@ public class WorkerHeartbeat implements Serializable {
private Set<Integer> taskIds;
private Integer port;
- public WorkerHeartbeat(int timeSecs, String topologyId,
- Set<Integer> taskIds, Integer port) {
+ public WorkerHeartbeat(int timeSecs, String topologyId, Set<Integer> taskIds, Integer port) {
this.timeSecs = timeSecs;
this.topologyId = topologyId;
@@ -83,7 +81,6 @@ public class WorkerHeartbeat implements Serializable {
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerReportError.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerReportError.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerReportError.java
new file mode 100644
index 0000000..79075bc
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerReportError.java
@@ -0,0 +1,38 @@
+package com.alibaba.jstorm.daemon.worker;
+
+import com.alibaba.jstorm.cluster.StormClusterState;
+import com.alibaba.jstorm.utils.TimeFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.Set;
+
+/**
+ * Created by xiaojian.fxj on 2015/8/12.
+ */
+public class WorkerReportError {
+ private static Logger LOG = LoggerFactory.getLogger(WorkerReportError.class);
+ private StormClusterState zkCluster;
+ private String hostName;
+
+ public WorkerReportError(StormClusterState _storm_cluster_state,
+ String _hostName) {
+ this.zkCluster = _storm_cluster_state;
+ this.hostName = _hostName;
+ }
+ public void report(String topology_id, Integer worker_port,
+ Set<Integer> tasks, String error) {
+ // Report worker's error to zk
+ try {
+ Date now = new Date();
+ String nowStr = TimeFormat.getSecond(now);
+ String errorInfo = error + "on " + this.hostName + ":" + worker_port + "," + nowStr;
+ for (Integer task : tasks){
+ zkCluster.report_task_error(topology_id, task, errorInfo, null);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed update "+worker_port+ "errors to ZK" + "\n", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerShutdown.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerShutdown.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerShutdown.java
index 0691fee..403c8cf 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerShutdown.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/WorkerShutdown.java
@@ -86,14 +86,13 @@ public class WorkerShutdown implements ShutdownableDameon {
LOG.info("Worker has been shutdown already");
return;
}
-
- if(recvConnection != null) {
- recvConnection.close();
+
+ if (recvConnection != null) {
+ recvConnection.close();
}
AsyncLoopRunnable.getShutdown().set(true);
threadPool.shutdown();
-
// shutdown tasks
for (ShutdownableDameon task : shutdowntasks) {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/SyncContainerHb.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/SyncContainerHb.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/SyncContainerHb.java
index 783f584..ab4213f 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/SyncContainerHb.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/SyncContainerHb.java
@@ -35,8 +35,7 @@ import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
public class SyncContainerHb extends RunnableCallback {
- private final static Logger LOG = LoggerFactory
- .getLogger(SyncContainerHb.class);
+ private final static Logger LOG = LoggerFactory.getLogger(SyncContainerHb.class);
private String readDir;
private String writeDir;
@@ -113,8 +112,7 @@ public class SyncContainerHb extends RunnableCallback {
try {
hb = Long.valueOf(biggest);
} catch (Exception e) {
- LOG.info("Heartbeat file " + biggest
- + " isn't a valid file, remove it");
+ LOG.info("Heartbeat file " + biggest + " isn't a valid file, remove it");
String path = readDir + File.separator + biggest;
try {
@@ -151,8 +149,7 @@ public class SyncContainerHb extends RunnableCallback {
return;
}
- String seconds =
- String.valueOf(System.currentTimeMillis() / SECOND_MILLISCOND);
+ String seconds = String.valueOf(System.currentTimeMillis() / SECOND_MILLISCOND);
String path = writeDir + File.separator + seconds;
@@ -289,8 +286,7 @@ public class SyncContainerHb extends RunnableCallback {
this.reserverNum = reserverNum;
}
- public static AsyncLoopThread mkInstance(String containerHbDir,
- String hbDir, int timeout, int frequence) {
+ public static AsyncLoopThread mkInstance(String containerHbDir, String hbDir, int timeout, int frequence) {
SyncContainerHb syncContainerHbThread = new SyncContainerHb();
syncContainerHbThread.setReadDir(containerHbDir);
@@ -306,9 +302,7 @@ public class SyncContainerHb extends RunnableCallback {
sb.append(",frequence:").append(frequence);
LOG.info(sb.toString());
- AsyncLoopThread thread =
- new AsyncLoopThread(syncContainerHbThread, true,
- Thread.NORM_PRIORITY, true);
+ AsyncLoopThread thread = new AsyncLoopThread(syncContainerHbThread, true, Thread.NORM_PRIORITY, true);
return thread;
}
@@ -329,31 +323,23 @@ public class SyncContainerHb extends RunnableCallback {
}
- public static AsyncLoopThread mkSupervisorInstance(Map conf)
- throws IOException {
- boolean isEnableContainer =
- ConfigExtension.isEnableContainerSupervisor();
+ public static AsyncLoopThread mkSupervisorInstance(Map conf) throws IOException {
+ boolean isEnableContainer = ConfigExtension.isEnableContainerSupervisor();
if (isEnableContainer) {
- String containerHbDir =
- ConfigExtension.getContainerSupervisorHearbeat();
+ String containerHbDir = ConfigExtension.getContainerSupervisorHearbeat();
String hbDir = StormConfig.supervisorHearbeatForContainer(conf);
- int timeout =
- ConfigExtension.getContainerHeartbeatTimeoutSeconds(conf);
- int frequence =
- ConfigExtension.getContainerHeartbeatFrequence(conf);
+ int timeout = ConfigExtension.getContainerHeartbeatTimeoutSeconds(conf);
+ int frequence = ConfigExtension.getContainerHeartbeatFrequence(conf);
return mkInstance(containerHbDir, hbDir, timeout, frequence);
}
- boolean isWorkerAutomaticStop =
- ConfigExtension.isWorkerStopWithoutSupervisor(conf);
+ boolean isWorkerAutomaticStop = ConfigExtension.isWorkerStopWithoutSupervisor(conf);
if (isWorkerAutomaticStop) {
String containerHbDir = null;
String hbDir = StormConfig.supervisorHearbeatForContainer(conf);
- int timeout =
- ConfigExtension.getContainerHeartbeatTimeoutSeconds(conf);
- int frequence =
- ConfigExtension.getContainerHeartbeatFrequence(conf);
+ int timeout = ConfigExtension.getContainerHeartbeatTimeoutSeconds(conf);
+ int frequence = ConfigExtension.getContainerHeartbeatFrequence(conf);
return mkInstance(containerHbDir, hbDir, timeout, frequence);
}
@@ -364,17 +350,14 @@ public class SyncContainerHb extends RunnableCallback {
}
public static AsyncLoopThread mkWorkerInstance(Map conf) throws IOException {
- boolean isEnableContainer =
- ConfigExtension.isEnableContainerSupervisor();
- boolean isWorkerAutomaticStop =
- ConfigExtension.isWorkerStopWithoutSupervisor(conf);
+ boolean isEnableContainer = ConfigExtension.isEnableContainerSupervisor();
+ boolean isWorkerAutomaticStop = ConfigExtension.isWorkerStopWithoutSupervisor(conf);
if (isEnableContainer == false && isWorkerAutomaticStop == false) {
LOG.info("Run worker without Apsara/Yarn container");
return null;
}
- String containerHbDir =
- StormConfig.supervisorHearbeatForContainer(conf);
+ String containerHbDir = StormConfig.supervisorHearbeatForContainer(conf);
String hbDir = null;
int timeout = ConfigExtension.getContainerHeartbeatTimeoutSeconds(conf);
int frequence = ConfigExtension.getContainerHeartbeatFrequence(conf);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/WorkerHeartbeatRunable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/WorkerHeartbeatRunable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/WorkerHeartbeatRunable.java
index 74aca05..51e74f8 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/WorkerHeartbeatRunable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/hearbeat/WorkerHeartbeatRunable.java
@@ -44,8 +44,7 @@ import com.alibaba.jstorm.utils.TimeUtils;
*
*/
public class WorkerHeartbeatRunable extends RunnableCallback {
- private static Logger LOG = LoggerFactory
- .getLogger(WorkerHeartbeatRunable.class);
+ private static Logger LOG = LoggerFactory.getLogger(WorkerHeartbeatRunable.class);
private WorkerData workerData;
@@ -69,8 +68,7 @@ public class WorkerHeartbeatRunable extends RunnableCallback {
this.worker_id = workerData.getWorkerId();
this.port = workerData.getPort();
this.topologyId = workerData.getTopologyId();
- this.task_ids =
- new CopyOnWriteArraySet<Integer>(workerData.getTaskids());
+ this.task_ids = new CopyOnWriteArraySet<Integer>(workerData.getTaskids());
this.shutdown = workerData.getShutdown();
String key = Config.WORKER_HEARTBEAT_FREQUENCY_SECS;
@@ -97,11 +95,9 @@ public class WorkerHeartbeatRunable extends RunnableCallback {
public void doHeartbeat() throws IOException {
int currtime = TimeUtils.current_time_secs();
- WorkerHeartbeat hb =
- new WorkerHeartbeat(currtime, topologyId, task_ids, port);
+ WorkerHeartbeat hb = new WorkerHeartbeat(currtime, topologyId, task_ids, port);
- LOG.debug("Doing heartbeat:" + worker_id + ",port:" + port + ",hb"
- + hb.toString());
+ LOG.debug("Doing heartbeat:" + worker_id + ",port:" + port + ",hb" + hb.toString());
LocalState state = getWorkerState();
state.put(Common.LS_WORKER_HEARTBEAT, hb);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/BackpressureCheckTrigger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/BackpressureCheckTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/BackpressureCheckTrigger.java
new file mode 100644
index 0000000..19ba2c9
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/BackpressureCheckTrigger.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.daemon.worker.timer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.jstorm.task.backpressure.BackpressureTrigger;
+
+
+public class BackpressureCheckTrigger extends TimerTrigger{
+ private static final Logger LOG = LoggerFactory.getLogger(BackpressureCheckTrigger.class);
+
+ private BackpressureTrigger trigger;
+
+ public BackpressureCheckTrigger(int initDelay, int frequence, String name, BackpressureTrigger trigger) {
+ if (frequence <= 0) {
+ LOG.warn(" The frequence of " + name + " is invalid");
+ frequence = 1;
+ }
+ this.firstTime = initDelay;
+ this.frequence = frequence;
+ this.trigger = trigger;
+ }
+
+ @Override
+ public void run() {
+ try {
+ trigger.checkAndTrigger();
+ } catch (Exception e) {
+ LOG.warn("Failed to publish timer event to " + name, e);
+ return;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/RotatingMapTrigger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/RotatingMapTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/RotatingMapTrigger.java
index 5a59e6f..a1a0990 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/RotatingMapTrigger.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/RotatingMapTrigger.java
@@ -29,26 +29,20 @@ import com.alibaba.jstorm.task.acker.Acker;
import com.alibaba.jstorm.utils.JStormUtils;
public class RotatingMapTrigger extends TimerTrigger {
- private static final Logger LOG = LoggerFactory
- .getLogger(RotatingMapTrigger.class);
+ private static final Logger LOG = LoggerFactory.getLogger(RotatingMapTrigger.class);
public RotatingMapTrigger(Map conf, String name, DisruptorQueue queue) {
this.name = name;
this.queue = queue;
this.opCode = TimerConstants.ROTATING_MAP;
- int msgTimeOut =
- JStormUtils.parseInt(
- conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 30);
+ int msgTimeOut = JStormUtils.parseInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 30);
frequence = (msgTimeOut) / (Acker.TIMEOUT_BUCKET_NUM - 1);
if (frequence <= 0) {
frequence = 1;
}
- firstTime =
- JStormUtils.parseInt(
- conf.get(Config.SUPERVISOR_WORKER_START_TIMEOUT_SECS),
- 120);
+ firstTime = JStormUtils.parseInt(conf.get(Config.SUPERVISOR_WORKER_START_TIMEOUT_SECS), 120);
firstTime += frequence;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchCheckTrigger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchCheckTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchCheckTrigger.java
new file mode 100644
index 0000000..0e00f66
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchCheckTrigger.java
@@ -0,0 +1,32 @@
+package com.alibaba.jstorm.daemon.worker.timer;
+
+import org.apache.log4j.Logger;
+
+import com.alibaba.jstorm.task.TaskBatchTransfer;
+
+public class TaskBatchCheckTrigger extends TimerTrigger {
+ private static final Logger LOG = Logger.getLogger(TickTupleTrigger.class);
+
+ private TaskBatchTransfer batchTransfer;
+
+ public TaskBatchCheckTrigger(int frequence, String name, TaskBatchTransfer transfer) {
+ if (frequence <= 0) {
+ LOG.warn(" The frequence of " + name + " is invalid");
+ frequence = 1;
+ }
+ this.firstTime = frequence;
+ this.frequence = frequence;
+ this.batchTransfer = transfer;
+ }
+
+ @Override
+ public void run() {
+ try {
+ batchTransfer.startCheck();
+ } catch (Exception e) {
+ LOG.warn("Failed to public timer event to " + name, e);
+ return;
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchFlushTrigger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchFlushTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchFlushTrigger.java
index 3a5353f..96165d0 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchFlushTrigger.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskBatchFlushTrigger.java
@@ -22,9 +22,9 @@ import org.slf4j.LoggerFactory;
import com.alibaba.jstorm.task.TaskBatchTransfer;
-public class TaskBatchFlushTrigger extends TimerTrigger{
+public class TaskBatchFlushTrigger extends TimerTrigger {
private static final Logger LOG = LoggerFactory.getLogger(TickTupleTrigger.class);
-
+
private TaskBatchTransfer batchTransfer;
public TaskBatchFlushTrigger(int frequence, String name, TaskBatchTransfer transfer) {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskHeartbeatTrigger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskHeartbeatTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskHeartbeatTrigger.java
index 0b67776..ad81a2b 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskHeartbeatTrigger.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TaskHeartbeatTrigger.java
@@ -17,6 +17,7 @@
*/
package com.alibaba.jstorm.daemon.worker.timer;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -24,33 +25,66 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.Config;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleExt;
+import backtype.storm.tuple.TupleImplExt;
+import backtype.storm.tuple.Values;
import backtype.storm.utils.DisruptorQueue;
-import com.alibaba.jstorm.daemon.worker.timer.TimerTrigger.TimerEvent;
+import com.alibaba.jstorm.cluster.Common;
+import com.alibaba.jstorm.task.error.ITaskReportErr;
+import com.alibaba.jstorm.task.UptimeComputer;
+import com.alibaba.jstorm.utils.IntervalCheck;
import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.TimeUtils;
public class TaskHeartbeatTrigger extends TimerTrigger {
- private static final Logger LOG = LoggerFactory
- .getLogger(TaskHeartbeatTrigger.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TaskHeartbeatTrigger.class);
private int taskId;
-
- private BlockingQueue<Object> controlQueue;
+ private String componentId;
+ private TopologyContext sysTopologyCtx;
- public TaskHeartbeatTrigger(Map conf, String name, DisruptorQueue queue,
- BlockingQueue<Object> controlQueue, int taskId) {
+ private BlockingQueue<Object> controlQueue = null;
+
+ private OutputCollector boltOutputCollector = null;
+ private SpoutOutputCollector spoutOutputCollector = null;
+
+ private long executeThreadHbTime;
+ private int taskHbTimeout;
+
+ private ITaskReportErr reportError;
+
+ private IntervalCheck intervalCheck;
+
+ private UptimeComputer uptime;
+
+ public TaskHeartbeatTrigger(Map conf, String name, DisruptorQueue queue, BlockingQueue<Object> controlQueue, int taskId, String componentId,
+ TopologyContext sysTopologyCtx, ITaskReportErr reportError) {
this.name = name;
this.queue = queue;
this.controlQueue = controlQueue;
this.opCode = TimerConstants.TASK_HEARTBEAT;
this.taskId = taskId;
+ this.componentId = componentId;
+ this.sysTopologyCtx = sysTopologyCtx;
+
+ this.frequence = JStormUtils.parseInt(conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS), 10);
+ this.firstTime = frequence;
- frequence =
- JStormUtils.parseInt(
- conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS), 10);
+ this.executeThreadHbTime = TimeUtils.current_time_secs();
+ this.taskHbTimeout = JStormUtils.parseInt(conf.get(Config.NIMBUS_TASK_TIMEOUT_SECS), 180);
+ this.intervalCheck = new IntervalCheck();
+ this.intervalCheck.setInterval(taskHbTimeout);
+ this.intervalCheck.start();
- firstTime = frequence;
+ this.reportError = reportError;
+
+ this.uptime = new UptimeComputer();
}
@Override
@@ -60,7 +94,6 @@ public class TaskHeartbeatTrigger extends TimerTrigger {
@Override
public void run() {
-
try {
updateObject();
@@ -69,16 +102,63 @@ public class TaskHeartbeatTrigger extends TimerTrigger {
return;
}
+ if (intervalCheck.check()) {
+ checkExecuteThreadHb();
+ }
+
+ if (componentId.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) {
+ Values values = new Values(uptime.uptime());
+ TupleExt tuple = new TupleImplExt(sysTopologyCtx, values, taskId, Common.TOPOLOGY_MASTER_HB_STREAM_ID);
+ queue.publish(tuple);
+ } else {
+ // Send task heartbeat to topology master
+ sendHbMsg();
+ }
+
+ // Send message used to monitor execute thread
TimerEvent event = new TimerEvent(opCode, object);
-
- controlQueue.offer(event);
- LOG.debug("Offer task HB event to controlQueue, taskId=" + taskId);
+ boolean ret = controlQueue.offer(event);
+ if (ret)
+ LOG.debug("Offer task HB event to controlQueue, taskId=" + taskId);
+ else
+ LOG.debug("Failed to offer task HB event to controlQueue, taskId=" + taskId);
} catch (Exception e) {
- LOG.warn("Failed to public timer event to " + name, e);
+ LOG.warn("Failed to publish timer event to " + name, e);
return;
}
LOG.debug(" Trigger timer event to " + name);
}
+
+ public void setSpoutOutputCollector(SpoutOutputCollector spoutOutputCollector) {
+ this.spoutOutputCollector = spoutOutputCollector;
+ }
+
+ public void setBoltOutputCollector(OutputCollector boltOutputCollector) {
+ this.boltOutputCollector = boltOutputCollector;
+ }
+
+ public void setExeThreadHbTime(long hbTime) {
+ this.executeThreadHbTime = hbTime;
+ }
+
+ private void sendHbMsg() {
+ List values = JStormUtils.mk_list(uptime.uptime());
+ if (spoutOutputCollector != null) {
+ spoutOutputCollector.emit(Common.TOPOLOGY_MASTER_HB_STREAM_ID, values);
+ } else if (boltOutputCollector != null) {
+ boltOutputCollector.emit(Common.TOPOLOGY_MASTER_HB_STREAM_ID, values);
+ } else {
+ LOG.warn("Failed to send hearbeat msg. OutputCollector has not been initialized!");
+ }
+ }
+
+ private void checkExecuteThreadHb() {
+ long currentTime = TimeUtils.current_time_secs();
+ if (currentTime - executeThreadHbTime > taskHbTimeout) {
+ String error = "No response from Task-" + taskId + ", last report time(sec) is " + executeThreadHbTime;
+ reportError.report(error);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TickTupleTrigger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TickTupleTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TickTupleTrigger.java
index ecf01c5..a70d8ab 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TickTupleTrigger.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TickTupleTrigger.java
@@ -29,13 +29,11 @@ import backtype.storm.utils.DisruptorQueue;
import com.alibaba.jstorm.utils.TimeUtils;
public class TickTupleTrigger extends TimerTrigger {
- private static final Logger LOG = LoggerFactory
- .getLogger(TickTupleTrigger.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TickTupleTrigger.class);
TopologyContext topologyContext;
- public TickTupleTrigger(TopologyContext topologyContext, int frequence,
- String name, DisruptorQueue queue) {
+ public TickTupleTrigger(TopologyContext topologyContext, int frequence, String name, DisruptorQueue queue) {
this.name = name;
this.queue = queue;
this.opCode = TimerConstants.TICK_TUPLE;
@@ -53,10 +51,7 @@ public class TickTupleTrigger extends TimerTrigger {
@Override
public void updateObject() {
this.object =
- new TupleImplExt(topologyContext, new Values(
- TimeUtils.current_time_secs()),
- (int) Constants.SYSTEM_TASK_ID,
- Constants.SYSTEM_TICK_STREAM_ID);
+ new TupleImplExt(topologyContext, new Values(TimeUtils.current_time_secs()), (int) Constants.SYSTEM_TASK_ID, Constants.SYSTEM_TICK_STREAM_ID);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TimerTrigger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TimerTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TimerTrigger.java
index 4cecbea..2c2c39c 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TimerTrigger.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/worker/timer/TimerTrigger.java
@@ -30,13 +30,11 @@ import backtype.storm.utils.DisruptorQueue;
import com.lmax.disruptor.InsufficientCapacityException;
public class TimerTrigger implements Runnable {
- private static final Logger LOG = LoggerFactory
- .getLogger(TimerTrigger.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TimerTrigger.class);
private static ScheduledExecutorService threadPool;
- public static void setScheduledExecutorService(
- ScheduledExecutorService scheduledExecutorService) {
+ public static void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
threadPool = scheduledExecutorService;
}
@@ -44,7 +42,7 @@ public class TimerTrigger implements Runnable {
protected int opCode;
protected int firstTime;
protected int frequence;
- protected DisruptorQueue queue;
+ protected DisruptorQueue queue = null;
protected Object object;
protected boolean block = true;
@@ -53,8 +51,7 @@ public class TimerTrigger implements Runnable {
}
public void register(TimeUnit timeUnit) {
- threadPool.scheduleAtFixedRate(this, firstTime, frequence,
- timeUnit);
+ threadPool.scheduleAtFixedRate(this, firstTime, frequence, timeUnit);
LOG.info("Successfully register timer " + this);
}
@@ -145,8 +142,7 @@ public class TimerTrigger implements Runnable {
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
public class TimerEvent {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/ClearThread.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/ClearThread.java b/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/ClearThread.java
index 5f16b0b..4d7d32c 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/ClearThread.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/ClearThread.java
@@ -31,8 +31,7 @@ import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeUtils;
public class ClearThread extends RunnableCallback {
- private static final Logger LOG = LoggerFactory
- .getLogger(ClearThread.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ClearThread.class);
private final int REQUEST_TIMEOUT_SECS;
private static final int TIMEOUT_CHECK_SECS = 5;
@@ -42,10 +41,7 @@ public class ClearThread extends RunnableCallback {
public ClearThread(Drpc drpc) {
drpcService = drpc;
- REQUEST_TIMEOUT_SECS =
- JStormUtils.parseInt(
- drpcService.getConf().get(
- Config.DRPC_REQUEST_TIMEOUT_SECS), 60);
+ REQUEST_TIMEOUT_SECS = JStormUtils.parseInt(drpcService.getConf().get(Config.DRPC_REQUEST_TIMEOUT_SECS), 60);
LOG.info("Drpc timeout seconds is " + REQUEST_TIMEOUT_SECS);
}
@@ -56,8 +52,7 @@ public class ClearThread extends RunnableCallback {
if (TimeUtils.time_delta(e.getValue()) > REQUEST_TIMEOUT_SECS) {
String id = e.getKey();
- drpcService.getIdtoResult().put(id,
- new DRPCExecutionException("Request timed out"));
+ drpcService.getIdtoResult().put(id, new DRPCExecutionException("Request timed out"));
Semaphore s = drpcService.getIdtoSem().get(id);
if (s != null) {
s.release();
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/Drpc.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/Drpc.java b/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/Drpc.java
index 09b4885..adbec06 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/Drpc.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/drpc/Drpc.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import com.alibaba.jstorm.utils.JStormServerUtils;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.THsHaServer;
@@ -53,8 +54,7 @@ import com.alibaba.jstorm.utils.TimeUtils;
* @author yannian
*
*/
-public class Drpc implements DistributedRPC.Iface,
- DistributedRPCInvocations.Iface, Shutdownable {
+public class Drpc implements DistributedRPC.Iface, DistributedRPCInvocations.Iface, Shutdownable {
private static final Logger LOG = LoggerFactory.getLogger(Drpc.class);
@@ -76,23 +76,18 @@ public class Drpc implements DistributedRPC.Iface,
private AtomicBoolean shutdown = new AtomicBoolean(false);
- private THsHaServer initHandlerServer(Map conf, final Drpc service)
- throws Exception {
+ private THsHaServer initHandlerServer(Map conf, final Drpc service) throws Exception {
int port = JStormUtils.parseInt(conf.get(Config.DRPC_PORT));
- int workerThreadNum =
- JStormUtils.parseInt(conf.get(Config.DRPC_WORKER_THREADS));
+ int workerThreadNum = JStormUtils.parseInt(conf.get(Config.DRPC_WORKER_THREADS));
int queueSize = JStormUtils.parseInt(conf.get(Config.DRPC_QUEUE_SIZE));
TNonblockingServerSocket socket = new TNonblockingServerSocket(port);
THsHaServer.Args targs = new THsHaServer.Args(socket);
targs.workerThreads(64);
targs.protocolFactory(new TBinaryProtocol.Factory());
- targs.processor(new DistributedRPC.Processor<DistributedRPC.Iface>(
- service));
+ targs.processor(new DistributedRPC.Processor<DistributedRPC.Iface>(service));
- ThreadPoolExecutor executor =
- new ThreadPoolExecutor(workerThreadNum, workerThreadNum, 60,
- TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize));
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(workerThreadNum, workerThreadNum, 60, TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize));
targs.executorService(executor);
THsHaServer handlerServer = new THsHaServer(targs);
@@ -101,17 +96,14 @@ public class Drpc implements DistributedRPC.Iface,
return handlerServer;
}
- private THsHaServer initInvokeServer(Map conf, final Drpc service)
- throws Exception {
+ private THsHaServer initInvokeServer(Map conf, final Drpc service) throws Exception {
int port = JStormUtils.parseInt(conf.get(Config.DRPC_INVOCATIONS_PORT));
TNonblockingServerSocket socket = new TNonblockingServerSocket(port);
THsHaServer.Args targsInvoke = new THsHaServer.Args(socket);
targsInvoke.workerThreads(64);
targsInvoke.protocolFactory(new TBinaryProtocol.Factory());
- targsInvoke
- .processor(new DistributedRPCInvocations.Processor<DistributedRPCInvocations.Iface>(
- service));
+ targsInvoke.processor(new DistributedRPCInvocations.Processor<DistributedRPCInvocations.Iface>(service));
THsHaServer invokeServer = new THsHaServer(targsInvoke);
@@ -136,6 +128,7 @@ public class Drpc implements DistributedRPC.Iface,
LOG.info("Starting Distributed RPC servers...");
new Thread(new Runnable() {
+
@Override
public void run() {
invokeServer.serve();
@@ -148,11 +141,18 @@ public class Drpc implements DistributedRPC.Iface,
clearThread = new AsyncLoopThread(new ClearThread(this));
LOG.info("Successfully start clear thread");
}
+ private void createPid(Map conf) throws Exception {
+ String pidDir = StormConfig.drpcPids(conf);
+
+ JStormServerUtils.createPid(pidDir);
+ }
public void init() throws Exception {
conf = StormConfig.read_storm_config();
LOG.info("Configuration is \n" + conf);
+ createPid(conf);
+
initClearThread();
initThrift();
@@ -188,14 +188,10 @@ public class Drpc implements DistributedRPC.Iface,
}
private AtomicInteger ctr = new AtomicInteger(0);
- private ConcurrentHashMap<String, Semaphore> idtoSem =
- new ConcurrentHashMap<String, Semaphore>();
- private ConcurrentHashMap<String, Object> idtoResult =
- new ConcurrentHashMap<String, Object>();
- private ConcurrentHashMap<String, Integer> idtoStart =
- new ConcurrentHashMap<String, Integer>();
- private ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>> requestQueues =
- new ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>>();
+ private ConcurrentHashMap<String, Semaphore> idtoSem = new ConcurrentHashMap<String, Semaphore>();
+ private ConcurrentHashMap<String, Object> idtoResult = new ConcurrentHashMap<String, Object>();
+ private ConcurrentHashMap<String, Integer> idtoStart = new ConcurrentHashMap<String, Integer>();
+ private ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>> requestQueues = new ConcurrentHashMap<String, ConcurrentLinkedQueue<DRPCRequest>>();
public void cleanup(String id) {
LOG.info("clean id " + id + " @ " + (System.currentTimeMillis()));
@@ -206,10 +202,8 @@ public class Drpc implements DistributedRPC.Iface,
}
@Override
- public String execute(String function, String args)
- throws DRPCExecutionException, TException {
- LOG.info("Received DRPC request for " + function + " " + args + " at "
- + (System.currentTimeMillis()));
+ public String execute(String function, String args) throws DRPCExecutionException, TException {
+ LOG.info("Received DRPC request for " + function + " " + args + " at " + (System.currentTimeMillis()));
int idinc = this.ctr.incrementAndGet();
int maxvalue = 1000000000;
int newid = idinc % maxvalue;
@@ -225,19 +219,16 @@ public class Drpc implements DistributedRPC.Iface,
this.idtoSem.put(strid, sem);
ConcurrentLinkedQueue<DRPCRequest> queue = acquireQueue(function);
queue.add(req);
- LOG.info("Waiting for DRPC request for " + function + " " + args
- + " at " + (System.currentTimeMillis()));
+ LOG.info("Waiting for DRPC request for " + function + " " + args + " at " + (System.currentTimeMillis()));
try {
sem.acquire();
} catch (InterruptedException e) {
LOG.error("acquire fail ", e);
}
- LOG.info("Acquired for DRPC request for " + function + " " + args
- + " at " + (System.currentTimeMillis()));
+ LOG.info("Acquired for DRPC request for " + function + " " + args + " at " + (System.currentTimeMillis()));
Object result = this.idtoResult.get(strid);
- LOG.info("Returning for DRPC request for " + function + " " + args
- + " at " + (System.currentTimeMillis()));
+ LOG.info("Returning for DRPC request for " + function + " " + args + " at " + (System.currentTimeMillis()));
this.cleanup(strid);
@@ -250,8 +241,7 @@ public class Drpc implements DistributedRPC.Iface,
@Override
public void result(String id, String result) throws TException {
Semaphore sem = this.idtoSem.get(id);
- LOG.info("Received result " + result + " for id " + id + " at "
- + (System.currentTimeMillis()));
+ LOG.info("Received result " + result + " for id " + id + " at " + (System.currentTimeMillis()));
if (sem != null) {
this.idtoResult.put(id, result);
sem.release();
@@ -265,8 +255,7 @@ public class Drpc implements DistributedRPC.Iface,
ConcurrentLinkedQueue<DRPCRequest> queue = acquireQueue(functionName);
DRPCRequest req = queue.poll();
if (req != null) {
- LOG.info("Fetched request for " + functionName + " at "
- + (System.currentTimeMillis()));
+ LOG.info("Fetched request for " + functionName + " at " + (System.currentTimeMillis()));
return req;
} else {
return new DRPCRequest("", "");
@@ -277,18 +266,15 @@ public class Drpc implements DistributedRPC.Iface,
@Override
public void failRequest(String id) throws TException {
Semaphore sem = this.idtoSem.get(id);
- LOG.info("failRequest result for id " + id + " at "
- + (System.currentTimeMillis()));
+ LOG.info("failRequest result for id " + id + " at " + (System.currentTimeMillis()));
if (sem != null) {
- this.idtoResult.put(id,
- new DRPCExecutionException("Request failed"));
+ this.idtoResult.put(id, new DRPCExecutionException("Request failed"));
sem.release();
}
}
private ConcurrentLinkedQueue<DRPCRequest> acquireQueue(String function) {
- ConcurrentLinkedQueue<DRPCRequest> reqQueue =
- requestQueues.get(function);
+ ConcurrentLinkedQueue<DRPCRequest> reqQueue = requestQueues.get(function);
if (reqQueue == null) {
reqQueue = new ConcurrentLinkedQueue<DRPCRequest>();
requestQueues.put(function, reqQueue);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerImp.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerImp.java b/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerImp.java
index cdfa1cc..2ec7e20 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerImp.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerImp.java
@@ -29,14 +29,12 @@ import com.alibaba.jstorm.callback.RunnableCallback;
* Event Manager, drop one event from queue, then execute the event.
*/
public class EventManagerImp extends RunnableCallback implements EventManager {
- private static final Logger LOG = LoggerFactory
- .getLogger(EventManagerImp.class);
+ private static final Logger LOG = LoggerFactory.getLogger(EventManagerImp.class);
private AtomicInteger added = new AtomicInteger();
private AtomicInteger processed = new AtomicInteger();
- private LinkedBlockingQueue<RunnableCallback> queue =
- new LinkedBlockingQueue<RunnableCallback>();
+ private LinkedBlockingQueue<RunnableCallback> queue = new LinkedBlockingQueue<RunnableCallback>();
private Exception e;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerPusher.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerPusher.java b/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerPusher.java
index 7f5e9ef..e4bf9e2 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerPusher.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/event/EventManagerPusher.java
@@ -30,8 +30,7 @@ public class EventManagerPusher extends RunnableCallback {
private int frequence;
- public EventManagerPusher(EventManager eventManager,
- RunnableCallback event, int frequence) {
+ public EventManagerPusher(EventManager eventManager, RunnableCallback event, int frequence) {
this.eventManager = eventManager;
this.event = event;
this.frequence = frequence;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ControlMessage.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ControlMessage.java
index d68347a..a83f07f 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ControlMessage.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/ControlMessage.java
@@ -24,11 +24,10 @@ import org.jboss.netty.buffer.ChannelBuffers;
public enum ControlMessage {
EOB_MESSAGE((short) -201), OK_RESPONSE((short) -200);
-
private short code;
private long timeStamp;
protected static int port;
-
+
static public void setPort(int port) {
ControlMessage.port = port;
}
@@ -62,9 +61,7 @@ public enum ControlMessage {
* @throws Exception
*/
ChannelBuffer buffer() throws Exception {
- ChannelBufferOutputStream bout =
- new ChannelBufferOutputStream(
- ChannelBuffers.directBuffer(encodeLength()));
+ ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));
write(bout);
bout.close();
return bout.buffer();
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageBatch.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageBatch.java
index 172822d..45d6600 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageBatch.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageBatch.java
@@ -28,8 +28,7 @@ import org.slf4j.LoggerFactory;
import backtype.storm.messaging.TaskMessage;
class MessageBatch {
- private static final Logger LOG = LoggerFactory
- .getLogger(MessageBatch.class);
+ private static final Logger LOG = LoggerFactory.getLogger(MessageBatch.class);
private int buffer_size;
private ArrayList<Object> msgs;
private int encoded_length;
@@ -58,8 +57,7 @@ class MessageBatch {
return;
}
- throw new RuntimeException("Unsuppoted object type "
- + obj.getClass().getName());
+ throw new RuntimeException("Unsuppoted object type " + obj.getClass().getName());
}
void remove(Object obj) {
@@ -89,8 +87,7 @@ class MessageBatch {
* try to add a TaskMessage to a batch
*
* @param taskMsg
- * @return false if the msg could not be added due to buffer size limit;
- * true otherwise
+ * @return false if the msg could not be added due to buffer size limit; true otherwise
*/
boolean tryAdd(TaskMessage taskMsg) {
if ((encoded_length + msgEncodeLength(taskMsg)) > buffer_size)
@@ -144,9 +141,7 @@ class MessageBatch {
* create a buffer containing the encoding of this batch
*/
ChannelBuffer buffer() throws Exception {
- ChannelBufferOutputStream bout =
- new ChannelBufferOutputStream(
- ChannelBuffers.directBuffer(encoded_length));
+ ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
for (Object msg : msgs)
if (msg instanceof TaskMessage)
@@ -168,19 +163,16 @@ class MessageBatch {
/**
* write a TaskMessage into a stream
*
- * Each TaskMessage is encoded as: task ... short(2) len ... int(4) payload
- * ... byte[] *
+ * Each TaskMessage is encoded as: task ... short(2) len ... int(4) payload ... byte[] *
*/
- private void writeTaskMessage(ChannelBufferOutputStream bout,
- TaskMessage message) throws Exception {
+ private void writeTaskMessage(ChannelBufferOutputStream bout, TaskMessage message) throws Exception {
int payload_len = 0;
if (message.message() != null)
payload_len = message.message().length;
int task_id = message.task();
if (task_id > Short.MAX_VALUE)
- throw new RuntimeException("Task ID should not exceed "
- + Short.MAX_VALUE);
+ throw new RuntimeException("Task ID should not exceed " + Short.MAX_VALUE);
bout.writeShort((short) task_id);
bout.writeInt(payload_len);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageDecoder.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageDecoder.java
index b147092..38e7930 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageDecoder.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageDecoder.java
@@ -17,11 +17,17 @@
*/
package com.alibaba.jstorm.message.netty;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
+import backtype.storm.messaging.TaskMessage;
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.common.metric.AsmHistogram;
+import com.alibaba.jstorm.common.metric.AsmMeter;
+import com.alibaba.jstorm.common.metric.AsmMetric;
+import com.alibaba.jstorm.metric.JStormMetrics;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.metric.MetricUtils;
+import com.alibaba.jstorm.metric.MetricDef;
+import com.alibaba.jstorm.utils.NetWorkUtils;
+import com.alibaba.jstorm.utils.TimeUtils;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
@@ -29,46 +35,40 @@ import org.jboss.netty.handler.codec.frame.FrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.common.metric.Histogram;
-import com.alibaba.jstorm.common.metric.Meter;
-import com.alibaba.jstorm.metric.JStormMetrics;
-import com.alibaba.jstorm.metric.MetricDef;
-import com.alibaba.jstorm.utils.NetWorkUtils;
-
-import backtype.storm.messaging.TaskMessage;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
public class MessageDecoder extends FrameDecoder {
- private static final Logger LOG = LoggerFactory
- .getLogger(MessageDecoder.class);
+ private static final Logger LOG = LoggerFactory.getLogger(MessageDecoder.class);
// here doesn't use Timer due to competition
- private static Histogram timer = JStormMetrics
- .registerWorkerHistogram(MetricDef.NETWORK_MSG_DECODE_TIME);
- private static Meter recvSpeed = JStormMetrics
- .registerWorkerMeter(MetricDef.NETTY_SRV_RECV_SPEED);
- private static Map<Channel, Histogram> networkTransmitTimeMap =
- new HashMap<Channel, Histogram>();
- private static Map<Channel, String> transmitNameMap =
- new HashMap<Channel, String>();
+
+ private static AsmHistogram msgDecodeTime = (AsmHistogram) JStormMetrics.registerWorkerMetric(
+ MetricUtils.workerMetricName(MetricDef.NETWORK_MSG_DECODE_TIME, MetricType.HISTOGRAM), new AsmHistogram());
+ private static AsmMeter recvSpeed = (AsmMeter) JStormMetrics.registerWorkerMetric(
+ MetricUtils.workerMetricName(MetricDef.NETTY_SRV_RECV_SPEED, MetricType.METER), new AsmMeter());
+
+ private static Map<Channel, AsmHistogram> networkTransmitTimeMap = new HashMap<Channel, AsmHistogram>();
+ private static Map<Channel, String> transmitNameMap = new HashMap<Channel, String>();
private boolean isServer;
private String localIp;
private int localPort;
+ private boolean enableTransitTimeMetrics;
+
public MessageDecoder(boolean isServer, Map conf) {
this.isServer = isServer;
this.localPort = ConfigExtension.getLocalWorkerPort(conf);
this.localIp = NetWorkUtils.ip();
-
+ this.enableTransitTimeMetrics = MetricUtils.isEnableNettyMetrics(conf);
}
/*
- * Each ControlMessage is encoded as: code (<0) ... short(2) Each
- * TaskMessage is encoded as: task (>=0) ... short(2) len ... int(4) payload
- * ... byte[] *
+ * Each ControlMessage is encoded as: code (<0) ... short(2) Each TaskMessage is encoded as: task (>=0) ... short(2) len ... int(4) payload ... byte[] *
*/
- protected Object decode(ChannelHandlerContext ctx, Channel channel,
- ChannelBuffer buf) throws Exception {
+ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
// Make sure that we have received at least a short
long available = buf.readableBytes();
// Length of control message is 10.
@@ -106,21 +106,20 @@ public class MessageDecoder extends FrameDecoder {
available -= 12;
if (ctrl_msg == ControlMessage.EOB_MESSAGE) {
+
long interval = System.currentTimeMillis() - timeStamp;
- if (interval > 0) {
-
- Histogram netTransTime =
- getTransmitHistogram(channel, clientPort);
- if (netTransTime != null) {
- netTransTime.update(interval );
-
- }
+ if (interval < 0)
+ interval = 0;
+
+ if(enableTransitTimeMetrics) {
+ AsmHistogram netTransTime = getTransmitHistogram(channel, clientPort);
+ if (netTransTime != null) {
+ netTransTime.update(interval * TimeUtils.NS_PER_US);
+ }
}
-
- recvSpeed.update(Double.valueOf(ControlMessage
- .encodeLength()));
}
+ recvSpeed.update(ControlMessage.encodeLength());
return ctrl_msg;
}
@@ -138,9 +137,7 @@ public class MessageDecoder extends FrameDecoder {
// Read the length field.
int length = buf.readInt();
if (length <= 0) {
- LOG.info(
- "Receive one message whose TaskMessage's message length is {}",
- length);
+ LOG.info("Receive one message whose TaskMessage's message length is {}", length);
return new TaskMessage(task, null);
}
@@ -165,72 +162,55 @@ public class MessageDecoder extends FrameDecoder {
// task, length, JStormUtils.toPrintableString(rawBytes));
TaskMessage ret = new TaskMessage(task, rawBytes);
- recvSpeed.update(Double.valueOf(rawBytes.length + 6));
+ recvSpeed.update(rawBytes.length + 6);
return ret;
} finally {
if (isServer) {
Long endTime = System.nanoTime();
- timer.update((endTime - startTime) / 1000000.0d);
+ msgDecodeTime.update((endTime - startTime) / TimeUtils.NS_PER_US);
}
}
}
- public Histogram getTransmitHistogram(Channel channel, int clientPort) {
- Histogram netTransTime = networkTransmitTimeMap.get(channel);
+ public AsmHistogram getTransmitHistogram(Channel channel, int clientPort) {
+ AsmHistogram netTransTime = networkTransmitTimeMap.get(channel);
if (netTransTime == null) {
+ InetSocketAddress sockAddr = (InetSocketAddress) (channel.getRemoteAddress());
+
+ String nettyConnection = NettyConnection.mkString(sockAddr.getAddress().getHostAddress(), clientPort, localIp, localPort);
+ netTransTime =
+ (AsmHistogram) JStormMetrics.registerNettyMetric(
+ MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_SRV_MSG_TRANS_TIME, nettyConnection), MetricType.HISTOGRAM),
+ new AsmHistogram());
- InetSocketAddress sockAddr =
- (InetSocketAddress) (channel.getRemoteAddress());
-
- String nettyConnection =
- NettyConnection.mkString(sockAddr.getAddress()
- .getHostAddress(), clientPort, localIp, localPort);
- try {
- netTransTime =
- JStormMetrics.registerWorkerHistogram(
- MetricDef.NETTY_SRV_MSG_TRANS_TIME,
- nettyConnection);
- } catch (Exception e) {
- LOG.error("{}.{} has been register",
- MetricDef.NETTY_SRV_MSG_TRANS_TIME, nettyConnection);
- removeTransmitHistogram(nettyConnection);
- return null;
- }
networkTransmitTimeMap.put(channel, netTransTime);
transmitNameMap.put(channel, nettyConnection);
- LOG.info("Register Transmit Histogram of {}, channel {}",
- nettyConnection, channel);
+ LOG.info("Register Transmit Histogram of {}, channel {}", nettyConnection, channel);
}
return netTransTime;
}
public static void removeTransmitHistogram(Channel channel) {
- Histogram netTransTime = networkTransmitTimeMap.remove(channel);
+ AsmHistogram netTransTime = networkTransmitTimeMap.remove(channel);
if (netTransTime != null) {
-
String nettyConnection = transmitNameMap.remove(channel);
-
- JStormMetrics.unregisterWorkerMetric(
- MetricDef.NETTY_SRV_MSG_TRANS_TIME, nettyConnection);
-
- LOG.info("Remove Transmit Histogram of {}, channel {}",
- nettyConnection, channel);
+ JStormMetrics.unregisterNettyMetric(MetricUtils.nettyMetricName(AsmMetric.mkName(MetricDef.NETTY_SRV_MSG_TRANS_TIME, nettyConnection),
+ MetricType.HISTOGRAM));
+ LOG.info("Remove Transmit Histogram of {}, channel {}", nettyConnection, channel);
}
}
-
+
public static void removeTransmitHistogram(String nettyConnection) {
Channel oldChannel = null;
-
- for (Entry<Channel, String> entry: transmitNameMap.entrySet()) {
+
+ for (Entry<Channel, String> entry : transmitNameMap.entrySet()) {
if (nettyConnection.equals(entry.getValue())) {
oldChannel = entry.getKey();
}
}
-
+
removeTransmitHistogram(oldChannel);
}
-
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageEncoder.java b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageEncoder.java
index 61e9187..0a750e0 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageEncoder.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/message/netty/MessageEncoder.java
@@ -23,8 +23,7 @@ import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
public class MessageEncoder extends OneToOneEncoder {
@Override
- protected Object encode(ChannelHandlerContext ctx, Channel channel,
- Object obj) throws Exception {
+ protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception {
if (obj instanceof ControlMessage) {
return ((ControlMessage) obj).buffer();
}
@@ -33,8 +32,7 @@ public class MessageEncoder extends OneToOneEncoder {
return ((MessageBatch) obj).buffer();
}
- throw new RuntimeException("Unsupported encoding of object of class "
- + obj.getClass().getName());
+ throw new RuntimeException("Unsupported encoding of object of class " + obj.getClass().getName());
}
}