You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/02/14 14:07:41 UTC
[6/9] storm git commit: STORM-2306 - Messaging subsystem redesign.
New Backpressure model.
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/StormTimer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/StormTimer.java b/storm-client/src/jvm/org/apache/storm/StormTimer.java
index b2e2b4a..0f54ce1 100644
--- a/storm-client/src/jvm/org/apache/storm/StormTimer.java
+++ b/storm-client/src/jvm/org/apache/storm/StormTimer.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,8 +18,8 @@
package org.apache.storm;
-import org.apache.storm.utils.Utils;
import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
import java.nio.channels.ClosedByInterruptException;
import java.util.Comparator;
@@ -97,6 +97,9 @@ public class StormTimer implements AutoCloseable {
// events.
Time.sleep(1000);
}
+ if (Thread.interrupted()) {
+ this.active.set(false);
+ }
} catch (Throwable e) {
if (!(Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e))
&& !(Utils.exceptionCauseIsInstanceOf(ClosedByInterruptException.class, e))) {
@@ -158,6 +161,17 @@ public class StormTimer implements AutoCloseable {
* @param jitterMs add jitter to the run
*/
public void schedule(int delaySecs, Runnable func, boolean checkActive, int jitterMs) {
+ scheduleMs(Time.secsToMillisLong(delaySecs), func, checkActive, jitterMs);
+ }
+
+ /**
+ * Same as schedule with millisecond resolution
+ * @param delayMs the number of milliseconds to delay before running the function
+ * @param func the function to run
+ * @param checkActive whether to check is the timer is active
+ * @param jitterMs add jitter to the run
+ */
+ public void scheduleMs(long delayMs, Runnable func, boolean checkActive, int jitterMs) {
if (func == null) {
throw new RuntimeException("function to schedule is null!");
}
@@ -165,7 +179,7 @@ public class StormTimer implements AutoCloseable {
checkActive();
}
String id = Utils.uuid();
- long endTimeMs = Time.currentTimeMillis() + Time.secsToMillisLong(delaySecs);
+ long endTimeMs = Time.currentTimeMillis() + delayMs;
if (jitterMs > 0) {
endTimeMs = this.task.random.nextInt(jitterMs) + endTimeMs;
}
@@ -176,6 +190,10 @@ public class StormTimer implements AutoCloseable {
schedule(delaySecs, func, true, 0);
}
+ public void scheduleMs(long delayMs, Runnable func) {
+ scheduleMs(delayMs, func, true, 0);
+ }
+
/**
* Schedule a function to run recurrently
* @param delaySecs the number of seconds to delay before running the function
@@ -194,6 +212,24 @@ public class StormTimer implements AutoCloseable {
}
/**
+ * Schedule a function to run recurrently
+ * @param delayMs the number of millis to delay before running the function
+ * @param recurMs the time between each invocation
+ * @param func the function to run
+ */
+ public void scheduleRecurringMs(long delayMs, final long recurMs, final Runnable func) {
+ scheduleMs(delayMs, new Runnable() {
+ @Override
+ public void run() {
+ func.run();
+ // This avoids a race condition with cancel-timer.
+ scheduleMs(recurMs, this, true, 0);
+ }
+ });
+ }
+
+
+ /**
* schedule a function to run recurrently with jitter
* @param delaySecs the number of seconds to delay before running the function
* @param recurSecs the time between each invocation
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
index c61c104..362d4dd 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index adc434c..ab893be 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -40,21 +40,21 @@ import org.apache.storm.generated.WorkerTokenServiceType;
import org.apache.storm.nimbus.NimbusInfo;
public interface IStormClusterState {
- public List<String> assignments(Runnable callback);
+ List<String> assignments(Runnable callback);
- public Assignment assignmentInfo(String stormId, Runnable callback);
+ Assignment assignmentInfo(String stormId, Runnable callback);
- public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
+ VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
- public Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
+ Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
- public List<String> blobstoreInfo(String blobKey);
+ List<String> blobstoreInfo(String blobKey);
- public List<NimbusSummary> nimbuses();
+ List<NimbusSummary> nimbuses();
- public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
+ void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
- public List<String> activeStorms();
+ List<String> activeStorms();
/**
* Get a storm base for a topology
@@ -62,87 +62,95 @@ public interface IStormClusterState {
* @param callback something to call if the data changes (best effort)
* @return the StormBase or null if it is not alive.
*/
- public StormBase stormBase(String stormId, Runnable callback);
+ StormBase stormBase(String stormId, Runnable callback);
- public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
+ ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
- public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
+ List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
- public List<ProfileRequest> getTopologyProfileRequests(String stormId);
+ List<ProfileRequest> getTopologyProfileRequests(String stormId);
- public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);
+ void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);
- public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
+ void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
- public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
+ Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
- public List<String> supervisors(Runnable callback);
+ List<String> supervisors(Runnable callback);
- public SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
+ SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
- public void setupHeatbeats(String stormId);
+ void setupHeatbeats(String stormId);
- public void teardownHeartbeats(String stormId);
+ void teardownHeartbeats(String stormId);
- public void teardownTopologyErrors(String stormId);
+ void teardownTopologyErrors(String stormId);
- public List<String> heartbeatStorms();
+ List<String> heartbeatStorms();
- public List<String> errorTopologies();
+ List<String> errorTopologies();
- public List<String> backpressureTopologies();
+ /** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon. */
+ @Deprecated
+ List<String> backpressureTopologies();
- public void setTopologyLogConfig(String stormId, LogConfig logConfig);
+ void setTopologyLogConfig(String stormId, LogConfig logConfig);
- public LogConfig topologyLogConfig(String stormId, Runnable cb);
+ LogConfig topologyLogConfig(String stormId, Runnable cb);
- public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info);
+ void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info);
- public void removeWorkerHeartbeat(String stormId, String node, Long port);
+ void removeWorkerHeartbeat(String stormId, String node, Long port);
- public void supervisorHeartbeat(String supervisorId, SupervisorInfo info);
+ void supervisorHeartbeat(String supervisorId, SupervisorInfo info);
- public void workerBackpressure(String stormId, String node, Long port, long timestamp);
+ /** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon. */
+ @Deprecated
+ boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback);
- public boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback);
+ /** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon. */
+ @Deprecated
+ void setupBackpressure(String stormId);
- public void setupBackpressure(String stormId);
+ /** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon. */
+ @Deprecated
+ void removeBackpressure(String stormId);
- public void removeBackpressure(String stormId);
+ /** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon. */
+ @Deprecated
+ void removeWorkerBackpressure(String stormId, String node, Long port);
- public void removeWorkerBackpressure(String stormId, String node, Long port);
+ void activateStorm(String stormId, StormBase stormBase);
- public void activateStorm(String stormId, StormBase stormBase);
+ void updateStorm(String stormId, StormBase newElems);
- public void updateStorm(String stormId, StormBase newElems);
+ void removeStormBase(String stormId);
- public void removeStormBase(String stormId);
+ void setAssignment(String stormId, Assignment info);
- public void setAssignment(String stormId, Assignment info);
+ void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo);
- public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo);
+ List<String> activeKeys();
- public List<String> activeKeys();
+ List<String> blobstore(Runnable callback);
- public List<String> blobstore(Runnable callback);
+ void removeStorm(String stormId);
- public void removeStorm(String stormId);
+ void removeBlobstoreKey(String blobKey);
- public void removeBlobstoreKey(String blobKey);
+ void removeKeyVersion(String blobKey);
- public void removeKeyVersion(String blobKey);
+ void reportError(String stormId, String componentId, String node, Long port, Throwable error);
- public void reportError(String stormId, String componentId, String node, Long port, Throwable error);
+ List<ErrorInfo> errors(String stormId, String componentId);
- public List<ErrorInfo> errors(String stormId, String componentId);
+ ErrorInfo lastError(String stormId, String componentId);
- public ErrorInfo lastError(String stormId, String componentId);
+ void setCredentials(String stormId, Credentials creds, Map<String, Object> topoConf) throws NoSuchAlgorithmException;
- public void setCredentials(String stormId, Credentials creds, Map<String, Object> topoConf) throws NoSuchAlgorithmException;
+ Credentials credentials(String stormId, Runnable callback);
- public Credentials credentials(String stormId, Runnable callback);
-
- public void disconnect();
+ void disconnect();
/**
* Get a private key used to validate a token is correct.
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 11757c0..719ebbf 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -71,6 +71,7 @@ public class StormClusterStateImpl implements IStormClusterState {
private AtomicReference<Runnable> supervisorsCallback;
// we want to register a topo directory getChildren callback for all workers of this dir
private ConcurrentHashMap<String, Runnable> backPressureCallback;
+
private AtomicReference<Runnable> assignmentsCallback;
private ConcurrentHashMap<String, Runnable> stormBaseCallback;
private AtomicReference<Runnable> blobstoreCallback;
@@ -398,6 +399,11 @@ public class StormClusterStateImpl implements IStormClusterState {
}
@Override
+ public List<String> backpressureTopologies() {
+ return stateStorage.get_children(ClusterUtils.BACKPRESSURE_SUBTREE, false);
+ }
+
+ @Override
public List<String> heartbeatStorms() {
return stateStorage.get_worker_hb_children(ClusterUtils.WORKERBEATS_SUBTREE, false);
}
@@ -408,11 +414,6 @@ public class StormClusterStateImpl implements IStormClusterState {
}
@Override
- public List<String> backpressureTopologies() {
- return stateStorage.get_children(ClusterUtils.BACKPRESSURE_SUBTREE, false);
- }
-
- @Override
public void setTopologyLogConfig(String stormId, LogConfig logConfig) {
stateStorage.set_data(ClusterUtils.logConfigPath(stormId), Utils.serialize(logConfig), defaultAcls);
}
@@ -447,35 +448,6 @@ public class StormClusterStateImpl implements IStormClusterState {
}
/**
- * If znode exists and timestamp is non-positive, delete;
- * if exists and timestamp is larger than 0, update the timestamp;
- * if not exists and timestamp is larger than 0, create the znode and set the timestamp;
- * if not exists and timestamp is non-positive, do nothing.
- * @param stormId The topology Id
- * @param node The node id
- * @param port The port number
- * @param timestamp The backpressure timestamp. Non-positive means turning off the worker backpressure
- */
- @Override
- public void workerBackpressure(String stormId, String node, Long port, long timestamp) {
- String path = ClusterUtils.backpressurePath(stormId, node, port);
- boolean existed = stateStorage.node_exists(path, false);
- if (existed) {
- if (timestamp <= 0) {
- stateStorage.delete_node(path);
- } else {
- byte[] data = ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array();
- stateStorage.set_data(path, data, defaultAcls);
- }
- } else {
- if (timestamp > 0) {
- byte[] data = ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array();
- stateStorage.set_ephemeral_node(path, data, defaultAcls);
- }
- }
- }
-
- /**
* Check whether a topology is in throttle-on status or not:
* if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off.
* But if the backpresure/storm-id dir is not empty and has not been updated for more than timeoutMs, we treat it as throttle-off.
@@ -495,17 +467,16 @@ public class StormClusterStateImpl implements IStormClusterState {
if(stateStorage.node_exists(path, false)) {
List<String> children = stateStorage.get_children(path, callback != null);
mostRecentTimestamp = children.stream()
- .map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false))
- .filter(data -> data != null)
- .mapToLong(data -> ByteBuffer.wrap(data).getLong())
- .max()
- .orElse(0);
+ .map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false))
+ .filter(data -> data != null)
+ .mapToLong(data -> ByteBuffer.wrap(data).getLong())
+ .max()
+ .orElse(0);
}
boolean ret = ((System.currentTimeMillis() - mostRecentTimestamp) < timeoutMs);
LOG.debug("topology backpressure is {}", ret ? "on" : "off");
return ret;
}
-
@Override
public void setupBackpressure(String stormId) {
stateStorage.mkdirs(ClusterUtils.backpressureStormRoot(stormId), defaultAcls);
@@ -533,13 +504,18 @@ public class StormClusterStateImpl implements IStormClusterState {
stateStorage.delete_node(path);
}
}
-
@Override
public void activateStorm(String stormId, StormBase stormBase) {
String path = ClusterUtils.stormPath(stormId);
stateStorage.set_data(path, Utils.serialize(stormBase), defaultAcls);
}
+ /**
+ * To update this function due to APersistentMap/APersistentSet is clojure's structure
+ *
+ * @param stormId
+ * @param newElems
+ */
@Override
public void updateStorm(String stormId, StormBase newElems) {
@@ -864,7 +840,7 @@ public class StormClusterStateImpl implements IStormClusterState {
return ret;
}
- private List<String> tokenizePath(String path) {
+ private static List<String> tokenizePath(String path) {
String[] toks = path.split("/");
java.util.ArrayList<String> rtn = new ArrayList<>();
for (String str : toks) {
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollector.java b/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollector.java
index e1581eb..3669c1b 100644
--- a/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollector.java
+++ b/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollector.java
@@ -40,7 +40,12 @@ public abstract class BatchOutputCollector {
emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
}
- public abstract void emitDirect(int taskId, String streamId, List<Object> tuple);
-
+ public abstract void emitDirect(int taskId, String streamId, List<Object> tuple);
+
+ /**
+ * Flush any buffered tuples (when batching is enabled)
+ */
+ public abstract void flush();
+
public abstract void reportError(Throwable error);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollectorImpl.java
index 246ae5d..8814f77 100644
--- a/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollectorImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/coordination/BatchOutputCollectorImpl.java
@@ -39,6 +39,11 @@ public class BatchOutputCollectorImpl extends BatchOutputCollector {
}
@Override
+ public void flush() {
+ _collector.flush();
+ }
+
+ @Override
public void reportError(Throwable error) {
_collector.reportError(error);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java b/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
index b0706c2..b32b05e 100644
--- a/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -124,10 +124,14 @@ public class CoordinatedBolt implements IRichBolt {
_delegate.fail(tuple);
}
+ public void flush() {
+ _delegate.flush();
+ }
+
public void resetTimeout(Tuple tuple) {
_delegate.resetTimeout(tuple);
}
-
+
public void reportError(Throwable error) {
_delegate.reportError(error);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Acker.java b/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
index 8675e39..58a14e2 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,6 +17,7 @@
*/
package org.apache.storm.daemon;
+import org.apache.storm.Constants;
import org.apache.storm.task.IBolt;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
@@ -102,14 +103,16 @@ public class Acker implements IBolt {
if (curr != null) {
pending.put(id, curr);
} //else if it has not been added yet, there is no reason time it out later on
+ } else if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {
+ collector.flush();
+ return;
} else {
LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
return;
}
int task = curr.spoutTask;
- if (curr != null && task >= 0
- && (curr.val == 0 || curr.failed || resetTimeout)) {
+ if (task >= 0 && (curr.val == 0 || curr.failed || resetTimeout)) {
Values tuple = new Values(id, getTimeDeltaMillis(curr.startTime));
if (curr.val == 0) {
pending.remove(id);
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
index 179e882..bd0ac9a 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -137,7 +137,7 @@ public class GrouperFactory {
public static class FieldsGrouper implements CustomStreamGrouping {
private Fields outFields;
- private List<Integer> targetTasks;
+ private List<List<Integer> > targetTasks;
private Fields groupFields;
private int numTasks;
@@ -149,14 +149,17 @@ public class GrouperFactory {
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
- this.targetTasks = targetTasks;
+ this.targetTasks = new ArrayList<List<Integer>>();
+ for (Integer targetTask : targetTasks) {
+ this.targetTasks.add(Collections.singletonList(targetTask));
+ }
this.numTasks = targetTasks.size();
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
int targetTaskIndex = TupleUtils.chooseTaskIndex(outFields.select(groupFields, values), numTasks);
- return Collections.singletonList(targetTasks.get(targetTaskIndex));
+ return targetTasks.get(targetTaskIndex);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
index f6053b9..f2ebd5f 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -126,7 +126,7 @@ public class StormCommon {
}
return keys;
}
-
+
private static void validateIds(StormTopology topology) throws InvalidTopologyException {
List<String> componentIds = new ArrayList<>();
componentIds.addAll(validateIds(topology.get_bolts()));
@@ -362,6 +362,9 @@ public class StormCommon {
public static void addEventLogger(Map<String, Object> conf, StormTopology topology) {
Integer numExecutors = ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS),
ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
+ if (numExecutors==null || numExecutors==0) {
+ return;
+ }
HashMap<String, Object> componentConf = new HashMap<>();
componentConf.put(Config.TOPOLOGY_TASKS, numExecutors);
componentConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)));
@@ -442,6 +445,7 @@ public class StormCommon {
public static void addSystemComponents(Map<String, Object> conf, StormTopology topology) {
Map<String, StreamInfo> outputStreams = new HashMap<>();
outputStreams.put(Constants.SYSTEM_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("rate_secs")));
+ outputStreams.put(Constants.SYSTEM_FLUSH_STREAM_ID, Thrift.outputFields(Arrays.asList()));
outputStreams.put(Constants.METRICS_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("interval")));
outputStreams.put(Constants.CREDENTIALS_CHANGED_STREAM_ID, Thrift.outputFields(Arrays.asList("creds")));
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/daemon/Task.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
index 658aadc..ce9d0e4 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/Task.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,23 +15,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.daemon;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Random;
+import java.util.function.BooleanSupplier;
+
+
import org.apache.storm.Config;
import org.apache.storm.Thrift;
-import org.apache.storm.daemon.metrics.BuiltinMetrics;
-import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.ExecutorTransfer;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.ComponentObject;
+import org.apache.storm.generated.DebugOptions;
import org.apache.storm.generated.JavaObject;
import org.apache.storm.generated.ShellComponent;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StateSpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
-import org.apache.storm.grouping.LoadMapping;
import org.apache.storm.hooks.ITaskHook;
import org.apache.storm.hooks.info.EmitInfo;
import org.apache.storm.spout.ShellSpout;
@@ -39,20 +51,15 @@ import org.apache.storm.stats.CommonStats;
import org.apache.storm.task.ShellBolt;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
public class Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
@@ -66,21 +73,21 @@ public class Task {
private String componentId;
private Object taskObject; // Spout/Bolt object
private Map<String, Object> topoConf;
- private Callable<Boolean> emitSampler;
+ private BooleanSupplier emitSampler;
private CommonStats executorStats;
private Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper;
- private BuiltinMetrics builtInMetrics;
+ private HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> streamToGroupers;
private boolean debug;
public Task(Executor executor, Integer taskId) throws IOException {
this.taskId = taskId;
this.executor = executor;
this.workerData = executor.getWorkerData();
- this.topoConf = executor.getStormConf();
+ this.topoConf = executor.getTopoConf();
this.componentId = executor.getComponentId();
this.streamComponentToGrouper = executor.getStreamToComponentToGrouper();
+ this.streamToGroupers = getGroupersPerStream(streamComponentToGrouper);
this.executorStats = executor.getStats();
- this.builtInMetrics = BuiltinMetricsUtil.mkData(executor.getType(), this.executorStats);
this.workerTopologyContext = executor.getWorkerTopologyContext();
this.emitSampler = ConfigUtils.mkStatsSampler(topoConf);
this.systemTopologyContext = mkTopologyContext(workerData.getSystemTopology());
@@ -103,9 +110,12 @@ public class Task {
if (grouping != null && grouping != GrouperFactory.DIRECT) {
throw new IllegalArgumentException("Cannot emitDirect to a task expecting a regular grouping");
}
- new EmitInfo(values, stream, taskId, Collections.singletonList(outTaskId)).applyOn(userTopologyContext);
+ if (!userTopologyContext.getHooks().isEmpty()) {
+ new EmitInfo(values, stream, taskId, Collections.singletonList(outTaskId)).applyOn(userTopologyContext);
+ }
+
try {
- if (emitSampler.call()) {
+ if (emitSampler.getAsBoolean()) {
executorStats.emittedTuple(stream);
if (null != outTaskId) {
executorStats.transferredTuples(stream, 1);
@@ -120,28 +130,33 @@ public class Task {
return new ArrayList<>(0);
}
+
public List<Integer> getOutgoingTasks(String stream, List<Object> values) {
if (debug) {
LOG.info("Emitting Tuple: taskId={} componentId={} stream={} values={}", taskId, componentId, stream, values);
}
- List<Integer> outTasks = new ArrayList<>();
- if (!streamComponentToGrouper.containsKey(stream)) {
- throw new IllegalArgumentException("Unknown stream ID: " + stream);
- }
- if (null != streamComponentToGrouper.get(stream)) {
- // null value for __system
- for (LoadAwareCustomStreamGrouping grouper : streamComponentToGrouper.get(stream).values()) {
+ ArrayList<Integer> outTasks = new ArrayList<>();
+
+ ArrayList<LoadAwareCustomStreamGrouping> groupers = streamToGroupers.get(stream);
+ if (null != groupers) {
+ for (int i=0; i<groupers.size(); ++i) {
+ LoadAwareCustomStreamGrouping grouper = groupers.get(i);
if (grouper == GrouperFactory.DIRECT) {
throw new IllegalArgumentException("Cannot do regular emit to direct stream");
}
List<Integer> compTasks = grouper.chooseTasks(taskId, values);
outTasks.addAll(compTasks);
}
+ } else {
+ throw new IllegalArgumentException("Unknown stream ID: " + stream);
+ }
+
+ if (!userTopologyContext.getHooks().isEmpty()) {
+ new EmitInfo(values, stream, taskId, outTasks).applyOn(userTopologyContext);
}
- new EmitInfo(values, stream, taskId, outTasks).applyOn(userTopologyContext);
try {
- if (emitSampler.call()) {
+ if (emitSampler.getAsBoolean()) {
executorStats.emittedTuple(stream);
executorStats.transferredTuples(stream, outTasks.size());
}
@@ -152,7 +167,7 @@ public class Task {
}
public Tuple getTuple(String stream, List values) {
- return new TupleImpl(systemTopologyContext, values, systemTopologyContext.getThisTaskId(), stream);
+ return new TupleImpl(systemTopologyContext, values, executor.getComponentId(), systemTopologyContext.getThisTaskId(), stream);
}
public Integer getTaskId() {
@@ -171,8 +186,33 @@ public class Task {
return taskObject;
}
- public BuiltinMetrics getBuiltInMetrics() {
- return builtInMetrics;
+
+ // Non Blocking call. If cannot emit to destination immediately, such tuples will be added to `pendingEmits` argument
+ public void sendUnanchored(String stream, List<Object> values, ExecutorTransfer transfer, Queue<AddressedTuple> pendingEmits) {
+ Tuple tuple = getTuple(stream, values);
+ List<Integer> tasks = getOutgoingTasks(stream, values);
+ for (Integer t : tasks) {
+ AddressedTuple addressedTuple = new AddressedTuple(t, tuple);
+ transfer.tryTransfer(addressedTuple, pendingEmits);
+ }
+ }
+
+ /**
+ * Send sampled data to the eventlogger if the global or component level debug flag is set (via nimbus api).
+ */
+ public void sendToEventLogger(Executor executor, List values,
+ String componentId, Object messageId, Random random, Queue<AddressedTuple> overflow) {
+ Map<String, DebugOptions> componentDebug = executor.getStormComponentDebug().get();
+ DebugOptions debugOptions = componentDebug.get(componentId);
+ if (debugOptions == null) {
+ debugOptions = componentDebug.get(executor.getStormId());
+ }
+ double spct = ((debugOptions != null) && (debugOptions.is_enable())) ? debugOptions.get_samplingpct() : 0;
+ if (spct > 0 && (random.nextDouble() * 100) < spct) {
+ sendUnanchored(StormCommon.EVENTLOGGER_STREAM_ID,
+ new Values(componentId, messageId, System.currentTimeMillis(), values),
+ executor.getExecutorTransfer(), overflow);
+ }
}
private TopologyContext mkTopologyContext(StormTopology topology) throws IOException {
@@ -190,7 +230,7 @@ public class Task {
ConfigUtils.supervisorStormDistRoot(conf, workerData.getTopologyId())),
ConfigUtils.workerPidsRoot(conf, workerData.getWorkerId()),
taskId,
- workerData.getPort(), workerData.getTaskIds(),
+ workerData.getPort(), workerData.getLocalTaskIds(),
workerData.getDefaultSharedResources(),
workerData.getUserSharedResources(),
executor.getSharedExecutorData(),
@@ -244,4 +284,26 @@ public class Task {
}
}
+ private static HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> getGroupersPerStream(Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper) {
+ HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> result = new HashMap<>(streamComponentToGrouper.size());
+
+ for (Entry<String, Map<String, LoadAwareCustomStreamGrouping>> entry : streamComponentToGrouper.entrySet()) {
+ String stream = entry.getKey();
+ Map<String, LoadAwareCustomStreamGrouping> groupers = entry.getValue();
+ ArrayList<LoadAwareCustomStreamGrouping> perStreamGroupers = new ArrayList<>();
+ if (groupers != null) { // null for __system bolt
+ for (LoadAwareCustomStreamGrouping grouper : groupers.values()) {
+ perStreamGroupers.add(grouper);
+ }
+ }
+ result.put(stream, perStreamGroupers);
+ }
+ return result;
+ }
+
+
+ @Override
+ public String toString() {
+ return taskId.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
index ccb1f71..8237ad1 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
@@ -21,25 +21,12 @@ import org.apache.storm.Config;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.IStatefulObject;
import org.apache.storm.metric.api.StateMetric;
-import org.apache.storm.stats.BoltExecutorStats;
-import org.apache.storm.stats.CommonStats;
-import org.apache.storm.stats.SpoutExecutorStats;
-import org.apache.storm.stats.StatsUtil;
import org.apache.storm.task.TopologyContext;
import java.util.HashMap;
import java.util.Map;
public class BuiltinMetricsUtil {
- public static BuiltinMetrics mkData(String type, CommonStats stats) {
- if (StatsUtil.SPOUT.equals(type)) {
- return new BuiltinSpoutMetrics((SpoutExecutorStats) stats);
- } else if (StatsUtil.BOLT.equals(type)) {
- return new BuiltinBoltMetrics((BoltExecutorStats) stats);
- }
- throw new RuntimeException("Invalid component type!");
- }
-
public static void registerIconnectionServerMetric(Object server, Map<String, Object> topoConf, TopologyContext context) {
if (server instanceof IStatefulObject) {
registerMetric("__recv-iconnection", new StateMetric((IStatefulObject) server), topoConf, context);
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java
index a5f26b8..cd326f0 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java
@@ -22,24 +22,25 @@ import org.apache.storm.metric.api.CountMetric;
public class SpoutThrottlingMetrics extends BuiltinMetrics {
private final CountMetric skippedMaxSpoutMs = new CountMetric();
- private final CountMetric skippedThrottleMs = new CountMetric();
private final CountMetric skippedInactiveMs = new CountMetric();
+ private final CountMetric skippedBackPressureMs = new CountMetric();
public SpoutThrottlingMetrics() {
metricMap.put("skipped-max-spout-ms", skippedMaxSpoutMs);
- metricMap.put("skipped-throttle-ms", skippedThrottleMs);
metricMap.put("skipped-inactive-ms", skippedInactiveMs);
+ metricMap.put("skipped-backpressure-ms", skippedBackPressureMs);
+
}
public void skippedMaxSpoutMs(long ms) {
this.skippedMaxSpoutMs.incrBy(ms);
}
- public void skippedThrottleMs(long ms) {
- this.skippedThrottleMs.incrBy(ms);
- }
-
public void skippedInactiveMs(long ms) {
this.skippedInactiveMs.incrBy(ms);
}
+
+ public void skippedBackPressureMs(long ms) {
+ this.skippedBackPressureMs.incrBy(ms);
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
index 29e75ed..42d2bb6 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
@@ -123,8 +123,8 @@ public class ClientSupervisorUtils {
}
final Process process = builder.start();
if (logPrefix != null || exitCodeCallback != null) {
- Utils.asyncLoop(new Callable<Object>() {
- public Object call() {
+ Utils.asyncLoop(new Callable<Long>() {
+ public Long call() {
if (logPrefix != null ) {
Utils.readAndLogStream(logPrefix,
process.getInputStream());
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
new file mode 100644
index 0000000..9b31c1a
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.daemon.worker;
+
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.storm.messaging.netty.BackPressureStatus;
+import org.apache.storm.utils.JCQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.storm.Constants.SYSTEM_TASK_ID;
+
+/***
+ * Tracks the BackPressure status using a Map<TaskId, JCQueue>.
+ * Special value NONE, is used to indicate that the task is not under BackPressure
+ * ConcurrentHashMap does not allow storing null values, so we use the special value NONE instead.
+ */
+public class BackPressureTracker {
+ private static final JCQueue NONE = new JCQueue ("NoneQ", 2, 0, 1, null) { };
+
+ static final Logger LOG = LoggerFactory.getLogger(BackPressureTracker.class);
+
+ private final Map<Integer, JCQueue> tasks = new ConcurrentHashMap<>(); // updates are more frequent than iteration
+ private final String workerId;
+
+ public BackPressureTracker(String workerId, List<Integer> allLocalTasks) {
+ this.workerId = workerId;
+ for (Integer taskId : allLocalTasks) {
+ if(taskId != SYSTEM_TASK_ID) {
+ tasks.put(taskId, NONE); // all tasks are considered to be not under BP initially
+ }
+ }
+ }
+
+ private void recordNoBackPressure(Integer taskId) {
+ tasks.put(taskId, NONE);
+ }
+
+ /***
+ * Record BP for a task
+ * This is called by transferLocalBatch() on NettyWorker thread
+ * @return true if an update was recorded, false if taskId is already under BP
+ */
+ public boolean recordBackPressure(Integer taskId, JCQueue recvQ) {
+ return tasks.put(taskId, recvQ) == NONE;
+ }
+
+ // returns true if there was a change in the BP situation
+ public boolean refreshBpTaskList() {
+ boolean changed = false;
+ LOG.debug("Running Back Pressure status change check");
+ for ( Entry<Integer, JCQueue> entry : tasks.entrySet()) {
+ if (entry.getValue() != NONE && entry.getValue().isEmptyOverflow()) {
+ recordNoBackPressure(entry.getKey());
+ changed = true;
+ }
+ }
+ return changed;
+ }
+
+ public BackPressureStatus getCurrStatus() {
+ ArrayList<Integer> bpTasks = new ArrayList<>(tasks.size());
+ ArrayList<Integer> nonBpTasks = new ArrayList<>(tasks.size());
+
+ for (Entry<Integer, JCQueue> entry : tasks.entrySet()) {
+ JCQueue q = entry.getValue();
+ if (q != NONE) {
+ bpTasks.add(entry.getKey());
+ } else {
+ nonBpTasks.add(entry.getKey());
+ }
+ }
+ return new BackPressureStatus(workerId, bpTasks, nonBpTasks);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/bc4c4807/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 57ceb39..ca0d4d0 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -19,7 +19,6 @@
package org.apache.storm.daemon.worker;
import com.google.common.base.Preconditions;
-import com.lmax.disruptor.EventHandler;
import java.io.File;
import java.io.IOException;
@@ -31,7 +30,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -41,6 +39,7 @@ import javax.security.auth.Subject;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.ObjectUtils;
import org.apache.storm.Config;
+import org.apache.storm.Constants;
import org.apache.storm.cluster.ClusterStateContext;
import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.DaemonType;
@@ -60,21 +59,19 @@ import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.LogConfig;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.IContext;
-import org.apache.storm.messaging.TaskMessage;
import org.apache.storm.security.auth.AuthUtils;
import org.apache.storm.security.auth.IAutoCredentials;
import org.apache.storm.stats.StatsUtil;
import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.DisruptorBackpressureCallback;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.WorkerBackpressureCallback;
-import org.apache.storm.utils.WorkerBackpressureThread;
+import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;
public class Worker implements Shutdownable, DaemonCommon {
@@ -93,9 +90,7 @@ public class Worker implements Shutdownable, DaemonCommon {
private WorkerState workerState;
private AtomicReference<List<IRunningExecutor>> executorsAtom;
private Thread transferThread;
- private WorkerBackpressureThread backpressureThread;
- // How long until the backpressure znode is invalid.
- private long backpressureZnodeTimeoutMs;
+
private AtomicReference<Credentials> credentialsAtom;
private Subject subject;
private Collection<IAutoCredentials> autoCreds;
@@ -151,138 +146,160 @@ public class Worker implements Shutdownable, DaemonCommon {
}
autoCreds = AuthUtils.GetAutoCredentials(topologyConf);
subject = AuthUtils.populateSubject(null, autoCreds, initCreds);
- backpressureZnodeTimeoutMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_TIMEOUT_SECS)) * 1000;
-
- Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
- @Override public Object run() throws Exception {
- workerState =
- new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
- stormClusterState, autoCreds);
-
- // Heartbeat here so that worker process dies if this fails
- // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
- // that worker is running and moves on
- doHeartBeat();
-
- executorsAtom = new AtomicReference<>(null);
-
- // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
- // to the supervisor
- workerState.heartbeatTimer
- .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
- try {
- doHeartBeat();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
-
- workerState.executorHeartbeatTimer
- .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
- Worker.this::doExecutorHeartbeats);
-
- workerState.registerCallbacks();
-
- workerState.refreshConnections(null);
-
- workerState.activateWorkerWhenAllConnectionsReady();
-
- workerState.refreshStormActive(null);
-
- workerState.runWorkerStartHooks();
-
- List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
- for (List<Long> e : workerState.getExecutors()) {
- if (ConfigUtils.isLocalMode(topologyConf)) {
- newExecutors.add(
- LocalExecutor.mkExecutor(workerState, e, initCreds)
- .execute());
- } else {
- newExecutors.add(
- Executor.mkExecutor(workerState, e, initCreds)
- .execute());
- }
+
+ Subject.doAs(subject, (PrivilegedExceptionAction<Object>)
+ () -> loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials)
+ );
+
+ }
+
+ private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState,
+ Map<String, String> initCreds, Credentials initialCredentials)
+ throws Exception {
+ workerState = new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
+ stormClusterState, autoCreds);
+
+ // Heartbeat here so that worker process dies if this fails
+ // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
+ // that worker is running and moves on
+ doHeartBeat();
+
+ executorsAtom = new AtomicReference<>(null);
+
+ // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
+ // to the supervisor
+ workerState.heartbeatTimer
+ .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
+ try {
+ doHeartBeat();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ workerState.executorHeartbeatTimer
+ .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
+ Worker.this::doExecutorHeartbeats);
+
+ workerState.registerCallbacks();
+
+ workerState.refreshConnections(null);
+
+ workerState.activateWorkerWhenAllConnectionsReady();
+
+ workerState.refreshStormActive(null);
+
+ workerState.runWorkerStartHooks();
+
+ List<Executor> execs = new ArrayList<>();
+ for (List<Long> e : workerState.getLocalExecutors()) {
+ if (ConfigUtils.isLocalMode(topologyConf)) {
+ Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
+ execs.add(executor);
+ for (int i = 0; i < executor.getTaskIds().size(); ++i) {
+ workerState.localReceiveQueues.put(executor.getTaskIds().get(i), executor.getReceiveQueue());
}
- executorsAtom.set(newExecutors);
-
- EventHandler<Object> tupleHandler = (packets, seqId, batchEnd) -> workerState
- .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>) packets, seqId, batchEnd);
-
- // This thread will publish the messages destined for remote tasks to remote connections
- transferThread = Utils.asyncLoop(() -> {
- workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
- return 0L;
- });
-
- DisruptorBackpressureCallback disruptorBackpressureHandler =
- mkDisruptorBackpressureHandler(workerState);
- workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
- workerState.transferQueue
- .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
- workerState.transferQueue
- .setHighWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
- workerState.transferQueue
- .setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
-
- WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler(topologyConf);
- backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
- if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
- backpressureThread.start();
- stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, workerState::refreshThrottle);
-
- int pollingSecs = ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
- workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, workerState::refreshThrottle);
+ } else {
+ Executor executor = Executor.mkExecutor(workerState, e, initCreds);
+ for (int i = 0; i < executor.getTaskIds().size(); ++i) {
+ workerState.localReceiveQueues.put(executor.getTaskIds().get(i), executor.getReceiveQueue());
}
+ execs.add(executor);
+ }
+ }
+
+ List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
+ for (Executor executor : execs) {
+ newExecutors.add(executor.execute());
+ }
+ executorsAtom.set(newExecutors);
+
+
+ // This thread will send out messages destined for remote tasks (on other workers)
+ if ( ( (Long)topologyConf.get(Config.TOPOLOGY_WORKERS) ) > 1 ) {
+ transferThread = workerState.makeTransferThread();
+ transferThread.setName("Worker-Transfer");
+ }
+
+ credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
- credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
-
- establishLogSettingCallback();
-
- workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
-
- workerState.refreshCredentialsTimer.scheduleRecurring(0,
- (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
- @Override public void run() {
- checkCredentialsChanged();
- if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
- checkThrottleChanged();
- }
- }
- });
-
- workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
- (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new Runnable() {
- @Override public void run() {
- try {
- LOG.debug("Checking if blobs have updated");
- updateBlobUpdates();
- } catch (IOException e) {
- // IOException from reading the version files to be ignored
- LOG.error(e.getStackTrace().toString());
- }
- }
- });
-
- // The jitter allows the clients to get the data at different times, and avoids thundering herd
- if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
- workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, Worker.this::doRefreshLoad);
+ establishLogSettingCallback();
+
+ workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
+
+ workerState.refreshCredentialsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), () -> {
+ checkCredentialsChanged();
+ });
+
+ workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
+ (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10),
+ () -> {
+ try {
+ LOG.debug("Checking if blobs have updated");
+ updateBlobUpdates();
+ } catch (IOException e) {
+ // IOException from reading the version files to be ignored
+ LOG.error(e.getStackTrace().toString());
}
+ }
+ );
- workerState.refreshConnectionsTimer.scheduleRecurring(0,
- (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
+ // The jitter allows the clients to get the data at different times, and avoids thundering herd
+ if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
+ workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, Worker.this::doRefreshLoad);
+ }
- workerState.resetLogLevelsTimer.scheduleRecurring(0,
- (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
+ workerState.refreshConnectionsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
- workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
- workerState::refreshStormActive);
+ workerState.resetLogLevelsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
- LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
- LOG.info("Worker {} for storm {} on {}:{} has finished loading", workerId, topologyId, assignmentId, port);
- return this;
- };
- });
+ workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
+ workerState::refreshStormActive);
+ setupFlushTupleTimer(topologyConf, newExecutors);
+ setupBackPressureCheckTimer(topologyConf);
+
+ LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
+ LOG.info("Worker {} for storm {} on {}:{} has finished loading", workerId, topologyId, assignmentId, port);
+ return this;
+ }
+
+ private void setupFlushTupleTimer(final Map<String, Object> topologyConf, final List<IRunningExecutor> executors) {
+ final Integer producerBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
+ final Integer xferBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_TRANSFER_BATCH_SIZE));
+ final Long flushIntervalMillis = ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BATCH_FLUSH_INTERVAL_MILLIS));
+ if ((producerBatchSize == 1 && xferBatchSize == 1) || flushIntervalMillis == 0) {
+ LOG.info("Flush Tuple generation disabled. producerBatchSize={}, xferBatchSize={}, flushIntervalMillis={}", producerBatchSize, xferBatchSize, flushIntervalMillis);
+ return;
+ }
+
+ workerState.flushTupleTimer.scheduleRecurringMs(flushIntervalMillis, flushIntervalMillis,
+ () -> {
+ // send flush tuple to all local executors
+ for (int i = 0; i < executors.size(); i++) {
+ IRunningExecutor exec = executors.get(i);
+ if (exec.getExecutorId().get(0) != Constants.SYSTEM_TASK_ID) {
+ exec.publishFlushTuple();
+ }
+ }
+ }
+ );
+ LOG.info("Flush tuple will be generated every {} millis", flushIntervalMillis);
+ }
+
+ private void setupBackPressureCheckTimer(final Map<String, Object> topologyConf) {
+ final Integer workerCount = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_WORKERS));
+ if (workerCount <= 1) {
+ LOG.info("BackPressure change checking is disabled as there is only one worker");
+ return;
+ }
+ final Long bpCheckIntervalMs = ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_CHECK_MILLIS));
+ workerState.backPressureCheckTimer.scheduleRecurringMs(bpCheckIntervalMs
+ , bpCheckIntervalMs, () -> workerState.refreshBackPressureStatus());
+ LOG.info("BackPressure status change checking will be performed every {} millis", bpCheckIntervalMs);
}
public void doRefreshLoad() {
@@ -297,7 +314,7 @@ public class Worker implements Shutdownable, DaemonCommon {
public void doHeartBeat() throws IOException {
LocalState state = ConfigUtils.workerState(workerState.conf, workerState.workerId);
state.setWorkerHeartBeat(new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId,
- workerState.executors.stream()
+ workerState.localExecutors.stream()
.map(executor -> new ExecutorInfo(executor.get(0).intValue(), executor.get(1).intValue()))
.collect(Collectors.toList()), workerState.port));
state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up.
@@ -308,11 +325,10 @@ public class Worker implements Shutdownable, DaemonCommon {
Map<List<Integer>, ExecutorStats> stats;
List<IRunningExecutor> executors = this.executorsAtom.get();
if (null == executors) {
- stats = StatsUtil.mkEmptyExecutorZkHbs(workerState.executors);
+ stats = StatsUtil.mkEmptyExecutorZkHbs(workerState.localExecutors);
} else {
stats = StatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors
- .toMap((Function<IRunningExecutor, List<Long>>) IRunningExecutor::getExecutorId,
- (Function<IRunningExecutor, ExecutorStats>) IRunningExecutor::renderStats)));
+ .toMap(IRunningExecutor::getExecutorId, IRunningExecutor::renderStats)));
}
Map<String, Object> zkHB = StatsUtil.mkZkWorkerHb(workerState.topologyId, stats, workerState.uptime.upTime());
try {
@@ -356,7 +372,7 @@ public class Worker implements Shutdownable, DaemonCommon {
public void checkCredentialsChanged() {
Credentials newCreds = workerState.stormClusterState.credentials(topologyId, null);
- if (! ObjectUtils.equals(newCreds, credentialsAtom.get())) {
+ if (!ObjectUtils.equals(newCreds, credentialsAtom.get())) {
// This does not have to be atomic, worst case we update when one is not needed
AuthUtils.updateSubject(subject, autoCreds, (null == newCreds) ? null : newCreds.get_creds());
for (IRunningExecutor executor : executorsAtom.get()) {
@@ -366,12 +382,6 @@ public class Worker implements Shutdownable, DaemonCommon {
}
}
- public void checkThrottleChanged() {
- boolean throttleOn =
- workerState.stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, this::checkThrottleChanged);
- workerState.throttleOn.set(throttleOn);
- }
-
public void checkLogConfigChanged() {
LogConfig logConfig = workerState.stormClusterState.topologyLogConfig(topologyId, null);
logConfigManager.processLogConfigChange(logConfig);
@@ -382,71 +392,8 @@ public class Worker implements Shutdownable, DaemonCommon {
workerState.stormClusterState.topologyLogConfig(topologyId, this::checkLogConfigChanged);
}
-
- /**
- * make a handler for the worker's send disruptor queue to
- * check highWaterMark and lowWaterMark for backpressure.
- */
- private DisruptorBackpressureCallback mkDisruptorBackpressureHandler(WorkerState workerState) {
- return new DisruptorBackpressureCallback() {
- @Override public void highWaterMark() throws Exception {
- LOG.debug("worker {} transfer-queue is congested, checking backpressure state", workerState.workerId);
- WorkerBackpressureThread.notifyBackpressureChecker(workerState.backpressureTrigger);
- }
-
- @Override public void lowWaterMark() throws Exception {
- LOG.debug("worker {} transfer-queue is not congested, checking backpressure state", workerState.workerId);
- WorkerBackpressureThread.notifyBackpressureChecker(workerState.backpressureTrigger);
- }
- };
- }
-
- /**
- * make a handler that checks and updates worker's backpressure flag.
- */
- private WorkerBackpressureCallback mkBackpressureHandler(Map<String, Object> topologyConf) {
- final List<IRunningExecutor> executors = executorsAtom.get();
- final long updateFreqMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS)) * 1000;
- return new WorkerBackpressureCallback() {
- @Override public void onEvent(Object obj) {
- if (null != executors) {
- String topologyId = workerState.topologyId;
- String assignmentId = workerState.assignmentId;
- int port = workerState.port;
- IStormClusterState stormClusterState = workerState.stormClusterState;
- long prevBackpressureTimestamp = workerState.backpressure.get();
- long currTimestamp = System.currentTimeMillis();
- long currBackpressureTimestamp = 0;
- // the backpressure flag is true if at least one of the disruptor queues has throttle-on
- boolean backpressureFlag = workerState.transferQueue.getThrottleOn() || (executors.stream()
- .map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get());
-
- if (backpressureFlag) {
- // update the backpressure timestamp every updateFreqMs ms
- if ((currTimestamp - prevBackpressureTimestamp) > updateFreqMs) {
- currBackpressureTimestamp = currTimestamp;
- } else {
- currBackpressureTimestamp = prevBackpressureTimestamp;
- }
- }
-
- if (currBackpressureTimestamp != prevBackpressureTimestamp) {
- try {
- LOG.debug("worker backpressure timestamp changing from {} to {}",
- prevBackpressureTimestamp, currBackpressureTimestamp);
- stormClusterState.workerBackpressure(topologyId, assignmentId, (long) port, currBackpressureTimestamp);
- // doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception
- workerState.backpressure.set(currBackpressureTimestamp);
- } catch (Exception ex) {
- LOG.error("workerBackpressure update failed when connecting to ZK ... will retry", ex);
- }
- }
- }
- }
- };
- }
-
- @Override public void shutdown() {
+ @Override
+ public void shutdown() {
try {
LOG.info("Shutting down worker {} {} {}", topologyId, assignmentId, port);
@@ -466,32 +413,32 @@ public class Worker implements Shutdownable, DaemonCommon {
// in which case it's a noop
workerState.mqContext.term();
LOG.info("Shutting down transfer thread");
- workerState.transferQueue.haltWithInterrupt();
+ workerState.haltWorkerTransfer();
- transferThread.interrupt();
- transferThread.join();
- LOG.info("Shut down transfer thread");
- backpressureThread.terminate();
- LOG.info("Shut down backpressure thread");
+ if (transferThread != null) {
+ transferThread.interrupt();
+ transferThread.join();
+ LOG.info("Shut down transfer thread");
+ }
workerState.heartbeatTimer.close();
workerState.refreshConnectionsTimer.close();
workerState.refreshCredentialsTimer.close();
workerState.checkForUpdatedBlobsTimer.close();
- workerState.refreshBackpressureTimer.close();
workerState.refreshActiveTimer.close();
workerState.executorHeartbeatTimer.close();
workerState.userTimer.close();
workerState.refreshLoadTimer.close();
workerState.resetLogLevelsTimer.close();
+ workerState.flushTupleTimer.close();
+ workerState.backPressureCheckTimer.close();
workerState.closeResources();
LOG.info("Trigger any worker shutdown hooks");
workerState.runWorkerShutdownHooks();
workerState.stormClusterState.removeWorkerHeartbeat(topologyId, assignmentId, (long) port);
- workerState.stormClusterState.removeWorkerBackpressure(topologyId, assignmentId, (long) port);
LOG.info("Disconnecting from storm cluster state context");
workerState.stormClusterState.disconnect();
workerState.stateStorage.close();
@@ -502,16 +449,17 @@ public class Worker implements Shutdownable, DaemonCommon {
}
- @Override public boolean isWaiting() {
+ @Override
+ public boolean isWaiting() {
return workerState.heartbeatTimer.isTimerWaiting()
&& workerState.refreshConnectionsTimer.isTimerWaiting()
&& workerState.refreshLoadTimer.isTimerWaiting()
&& workerState.refreshCredentialsTimer.isTimerWaiting()
&& workerState.checkForUpdatedBlobsTimer.isTimerWaiting()
- && workerState.refreshBackpressureTimer.isTimerWaiting()
&& workerState.refreshActiveTimer.isTimerWaiting()
&& workerState.executorHeartbeatTimer.isTimerWaiting()
- && workerState.userTimer.isTimerWaiting();
+ && workerState.userTimer.isTimerWaiting()
+ && workerState.flushTupleTimer.isTimerWaiting();
}
public static void main(String[] args) throws Exception {