You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by roshannaik <gi...@git.apache.org> on 2018/01/05 04:16:41 UTC
[GitHub] storm pull request #2502: new PR for STORM-2306
GitHub user roshannaik opened a pull request:
https://github.com/apache/storm/pull/2502
new PR for STORM-2306
Since the [old PR page](https://github.com/apache/storm/pull/2241) had become unusable (due to lots of old comments). Creating a new PR for the same.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/roshannaik/storm STORM-2306-2
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/storm/pull/2502.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2502
----
commit 25cce136d256ea07183d6503246abe6c211ac522
Author: Roshan Naik <ro...@...>
Date: 2017-07-25T03:01:00Z
Messaging susbsytem redesign. Rebased to latest master. Validated compilation and few simple topo runs. C->N: 8 mil/sec. 2.8 mil/sec with 2 workers. 1 mil/sec with ack (30-800 micosec). C->ID->N: 6.2mill/sec.
commit 4153a2f6fb5036a216b285ca00c3cce8656f9996
Author: Roshan Naik <ro...@...>
Date: 2017-07-26T03:34:48Z
addressing satish review comments
commit 2f070c30d09469a08edcb752ebdece61f4abddab
Author: Roshan Naik <ro...@...>
Date: 2017-08-01T07:39:10Z
addressing review comments
commit ec4430616428d79c852e5a6a1fe0c64c61c4d021
Author: Roshan Naik <ro...@...>
Date: 2017-08-02T06:41:35Z
Added Bolt Sleep strategy
commit 4a76691e744e16de9a58a44e359330856373da79
Author: Roshan Naik <ro...@...>
Date: 2017-08-03T19:38:42Z
Added BackPressure Sleep Strategy. Shutdown bug fix
commit b052564ac7b66537815b7a0137a558f2927ddf98
Author: Roshan Naik <ro...@...>
Date: 2017-08-04T03:56:12Z
Changing defaults for tighter latency and favor lower/medium throughput topologies
commit 8eea6aa0457c280ad582a3112b6828b6c9e60a00
Author: Roshan Naik <ro...@...>
Date: 2017-08-08T03:29:31Z
Some minor simplifications. overriding settings within storm-perf topos as needed
commit 300a4f84eed239c08ea1f2ac0c4945353f0555e6
Author: Roshan Naik <ro...@...>
Date: 2017-08-08T03:30:17Z
resolving runtime exception issues due to conflict with STORM-2672
commit 23b5f0979f44f157c0634296bd311ad6b290e276
Author: Robert Evans <ev...@...>
Date: 2017-08-08T15:44:14Z
Merge branch 'STORM-2306m' of https://github.com/roshannaik/storm into STORM-2306
This is a test, this is only a test
commit 1c628d1ba1eeea81e21b5d59a5cb5eaabea6c03b
Author: Robert Evans <ev...@...>
Date: 2017-08-08T20:44:51Z
FIX some of the tests and metrics for the system bolt
commit b75192e1ae0311747bf00b5a6d320fc0e662183f
Author: roshannaik <ro...@...>
Date: 2017-08-11T20:07:03Z
Merge pull request #3 from revans2/STORM-2306
FIX some of the tests and metrics for the system bolt
commit 2506c6eaa9f04f3c57e6875d6078135ac99b8163
Author: Roshan Naik <ro...@...>
Date: 2017-08-12T00:37:58Z
System Bolt doesnt get much incoming traffic, so can use its own pre-defined sleep strategy (consider making this configurable if useful)
commit 47579ea936cde21f87973ad4b73226737bfe806e
Author: Roshan Naik <ro...@...>
Date: 2017-08-15T09:01:09Z
Fixing process_latency bug reported by Bobby
commit 4704245e44b0868cfa5237f97a1fca0686df11a9
Author: Roshan Naik <ro...@...>
Date: 2017-08-18T03:33:55Z
Fixing the bug of wrong conf used for flushing as reported by Bobby. Adding Perf tuning document
commit 5cb42cbb247052eac1c7858192db09a583921258
Author: Roshan Naik <ro...@...>
Date: 2017-08-19T00:41:20Z
Bug fix in BackPressure mode WaitStrategy to use the right settings. Updates to Performance.md
commit 942fbe6fd60fffe9d1cdc08e960d4b7752fc69ad
Author: Roshan Naik <ro...@...>
Date: 2017-08-21T21:35:29Z
Aruns fix for OOM issue in Netty code (workerTransfer)
commit 5c223be2f774de9a9f4dded8450ef181f897a67a
Author: Roshan Naik <ro...@...>
Date: 2017-08-22T03:24:56Z
Flush Tuples need to be put into the Worker Transfer Queue as well
commit b1a99a038319dfe15d4ac4dfde819f0008ce5291
Author: Roshan Naik <ro...@...>
Date: 2017-08-28T23:11:40Z
Allow ConstSpout to have a configurable sleep for throttling. Improve defaults for sleep strategy, queue size & max.spout.pending.
commit f9beb5358c91a799bec49f071a8f68153042a48a
Author: Roshan Naik <ro...@...>
Date: 2017-08-29T05:29:31Z
Revert changes in ShuffleGrouping until there is consensus on thread safety in groupers.
commit 6cd1424956a0d95f36a6b10158107dfd4ac02a1b
Author: Roshan Naik <ro...@...>
Date: 2017-08-31T10:54:24Z
Introduce overflow to avoid deadlocked cycles involving ACKer. Added more non-blocking methods to JCQueue. Reverting max.spout.pending to null. Minor updates to ConstSpoutNullBolt Topo. Noticed a bug (not fixed) that can lead to OOM in multiworker mode
commit 6bfda34cfdc945cd0cb58c7e13369d577129e6b2
Author: roshannaik <ro...@...>
Date: 2017-08-31T11:03:59Z
Merge branch 'master' into STORM-2306m
commit ece4ce2d8c829da53cfb48448d4f988110e237aa
Author: Roshan Naik <ro...@...>
Date: 2017-09-01T21:10:34Z
bug fix in SpoutOutputCollectorImpl.emit()
commit ba0b9e2a2a5d5a99c50f2506a7f76a261fa19fc0
Author: Roshan Naik <ro...@...>
Date: 2017-09-14T07:47:00Z
Moving worker xsfer code out of WorkerState into WorkerTransfer.java - interworker=3mill/sec. withAck=700k/s,1.3ms
commit c275bfc11c12ff743ccf26c5bbc8db08e7c34030
Author: Roshan Naik <ro...@...>
Date: 2017-11-14T23:27:31Z
- Squishing recent commits related to Interworker backpressure communication
- Fixed some issues found during multi-worker testing
- Fixes to perf topos to pick up cmd line args correctly
- Removing support for single producer mode in JCQueue
- Almost all msg transfers are non-blocking now (other than metrics and credentials-change notifications). Allows spouts and bolts to processes metrics ticks even if output path is choked due to backpressure
- Some temporary debug logs that need to be removed after testing
- Needs more multiworker mode testing
commit a6f84c6784eb1a66b968b6b01262e76a136f3d89
Author: Roshan Naik <ro...@...>
Date: 2017-11-30T10:19:33Z
- Fixing bug in tryTransferLocal() that caused Trident issues.
- Adding workerID to BpStatus for better debuggability
- Logging the length of an idle stretch for BP & max.spout.pending wait situations
- Changes to defaults: topology.executor.receive.buffer.size=32k (rounding up to power of 2), topology.flush.tuple.freq.millis=1 (same as master)
- minor fixes and improvements
commit d0aa1ad27b9375a2c393c760427e24f3158441b8
Author: Roshan Naik <ro...@...>
Date: 2017-11-30T10:21:52Z
Change logLevels. Fix unit test
commit b6fc2b8aaa92c41f786b53039989b373cbf6cd1d
Author: Roshan Naik <ro...@...>
Date: 2017-12-03T09:08:35Z
Addressing minor TODOs
commit 39b395d4f5b3c5fa50a1ebdb08333264d40a9308
Author: Roshan Naik <ro...@...>
Date: 2017-12-05T01:12:21Z
- Renaming config settings for easier understanding ( Progressive Wait Strategy settings + topology.flush.tuple.freq.millis -> topology.batch.flush.interval.millis )
commit 1f35bc9c16795177704ffd8eed4500aa735c24e3
Author: Roshan Naik <ro...@...>
Date: 2017-12-05T01:13:34Z
- Added message Drop Metrics
- Added BackPressure unit tests
commit b179a1937ec46ac0651c88b15b9e35dbfd55aef6
Author: Roshan Naik <ro...@...>
Date: 2017-12-05T05:05:29Z
Client.send throws IOException is connection closes. Allows it to recover from remote worker dying
----
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159960500
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -225,51 +228,62 @@ private static String getExecutorType(WorkerTopologyContext workerTopologyContex
}
}
+ public Queue<AddressedTuple> getPendingEmits() {
+ return pendingEmits;
+ }
+
/**
* separated from mkExecutor in order to replace executor transfer in executor data for testing
*/
public ExecutorShutdown execute() throws Exception {
LOG.info("Loading executor tasks " + componentId + ":" + executorId);
- registerBackpressure();
- Utils.SmartThread systemThreads =
- Utils.asyncLoop(executorTransfer, executorTransfer.getName(), reportErrorDie);
-
String handlerName = componentId + "-executor" + executorId;
- Utils.SmartThread handlers =
+ Utils.SmartThread handler =
Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName);
setupTicks(StatsUtil.SPOUT.equals(type));
LOG.info("Finished loading executor " + componentId + ":" + executorId);
- return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask);
+ return new ExecutorShutdown(this, Lists.newArrayList(handler), idToTask);
}
public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception;
- @SuppressWarnings("unchecked")
@Override
- public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception {
- ArrayList<AddressedTuple> addressedTuples = (ArrayList<AddressedTuple>) event;
- for (AddressedTuple addressedTuple : addressedTuples) {
- TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
- int taskId = addressedTuple.getDest();
- if (isDebug) {
- LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
- }
+ public void accept(Object event) {
+ if (event == JCQueue.INTERRUPT) {
+ throw new RuntimeException(new InterruptedException("JCQ processing interrupted") );
+ }
+ AddressedTuple addressedTuple = (AddressedTuple)event;
+ int taskId = addressedTuple.getDest();
+
+ TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
+ if (isDebug) {
+ LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
+ }
+
+ try {
if (taskId != AddressedTuple.BROADCAST_DEST) {
tupleActionFn(taskId, tuple);
} else {
for (Integer t : taskIds) {
tupleActionFn(t, tuple);
}
}
+ } catch (Exception e) {
+ throw new RuntimeException(e);
--- End diff --
Where is this exception handled?
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159958378
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
@@ -155,134 +150,159 @@ public void start() throws Exception {
Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
@Override public Object run() throws Exception {
- workerState =
- new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
+ return loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials);
+ }
+ }); // Subject.doAs(...)
+
+ }
+
+ private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState, Map<String, String> initCreds, Credentials initialCredentials)
+ throws Exception {
+ workerState =
+ new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
stormClusterState);
- // Heartbeat here so that worker process dies if this fails
- // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
- // that worker is running and moves on
- doHeartBeat();
+ // Heartbeat here so that worker process dies if this fails
+ // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
+ // that worker is running and moves on
+ doHeartBeat();
- executorsAtom = new AtomicReference<>(null);
+ executorsAtom = new AtomicReference<>(null);
- // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
- // to the supervisor
- workerState.heartbeatTimer
- .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
- try {
- doHeartBeat();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
+ // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
+ // to the supervisor
+ workerState.heartbeatTimer
+ .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
+ try {
+ doHeartBeat();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
- workerState.executorHeartbeatTimer
- .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
+ workerState.executorHeartbeatTimer
+ .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
Worker.this::doExecutorHeartbeats);
- workerState.registerCallbacks();
+ workerState.registerCallbacks();
- workerState.refreshConnections(null);
+ workerState.refreshConnections(null);
- workerState.activateWorkerWhenAllConnectionsReady();
+ workerState.activateWorkerWhenAllConnectionsReady();
- workerState.refreshStormActive(null);
+ workerState.refreshStormActive(null);
- workerState.runWorkerStartHooks();
+ workerState.runWorkerStartHooks();
- List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
- for (List<Long> e : workerState.getExecutors()) {
- if (ConfigUtils.isLocalMode(topologyConf)) {
- newExecutors.add(
- LocalExecutor.mkExecutor(workerState, e, initCreds)
- .execute());
- } else {
- newExecutors.add(
- Executor.mkExecutor(workerState, e, initCreds)
- .execute());
- }
- }
- executorsAtom.set(newExecutors);
+ List<Executor> execs = new ArrayList<>();
+ for (List<Long> e : workerState.getExecutors()) {
+ if (ConfigUtils.isLocalMode(topologyConf)) {
+ Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
+ execs.add( executor );
+ workerState.localReceiveQueues.put(executor.getTaskIds().get(0), executor.getReceiveQueue());
+ } else {
+ Executor executor = Executor.mkExecutor(workerState, e, initCreds);
+ workerState.localReceiveQueues.put(executor.getTaskIds().get(0), executor.getReceiveQueue());
+ execs.add(executor);
+ }
+ }
- EventHandler<Object> tupleHandler = (packets, seqId, batchEnd) -> workerState
- .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>) packets, seqId, batchEnd);
+ List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
+ for (Executor executor : execs) {
+ newExecutors.add(executor.execute());
+ }
+ executorsAtom.set(newExecutors);
- // This thread will publish the messages destined for remote tasks to remote connections
- transferThread = Utils.asyncLoop(() -> {
- workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
- return 0L;
- });
+ // This thread will send out messages destined for remote tasks (on other workers)
+ transferThread = workerState.makeTransferThread();
+ transferThread.setName("Worker-Transfer");
- DisruptorBackpressureCallback disruptorBackpressureHandler =
- mkDisruptorBackpressureHandler(workerState);
- workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
- workerState.transferQueue
- .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
- workerState.transferQueue
- .setHighWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
- workerState.transferQueue
- .setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
-
- WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler();
- backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
- if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
- backpressureThread.start();
- stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle);
-
- int pollingSecs = ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
- workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, workerState::refreshThrottle);
- }
+ credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
- credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
+ establishLogSettingCallback();
- establishLogSettingCallback();
+ workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
- workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
+ workerState.refreshCredentialsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
+ @Override public void run() {
+ checkCredentialsChanged();
+ }
+ });
- workerState.refreshCredentialsTimer.scheduleRecurring(0,
- (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
- @Override public void run() {
- checkCredentialsChanged();
- if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
- checkThrottleChanged();
- }
+ workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
+ (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new Runnable() {
+ @Override public void run() {
+ try {
+ LOG.debug("Checking if blobs have updated");
+ updateBlobUpdates();
+ } catch (IOException e) {
+ // IOException from reading the version files to be ignored
+ LOG.error(e.getStackTrace().toString());
}
- });
-
- workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
- (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new Runnable() {
- @Override public void run() {
- try {
- LOG.debug("Checking if blobs have updated");
- updateBlobUpdates();
- } catch (IOException e) {
- // IOException from reading the version files to be ignored
- LOG.error(e.getStackTrace().toString());
- }
- }
- });
-
- // The jitter allows the clients to get the data at different times, and avoids thundering herd
- if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
- workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
- }
+ }
+ });
+
+ // The jitter allows the clients to get the data at different times, and avoids thundering herd
+ if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
+ workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
+ }
+
+ workerState.refreshConnectionsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
- workerState.refreshConnectionsTimer.scheduleRecurring(0,
- (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
+ workerState.resetLogLevelsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
- workerState.resetLogLevelsTimer.scheduleRecurring(0,
- (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
+ workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
+ workerState::refreshStormActive);
- workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
- workerState::refreshStormActive);
+ setupFlushTupleTimer(topologyConf, newExecutors);
+ setupBackPressureCheckTimer(topologyConf);
+
+ LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
+ LOG.info("Worker {} for storm {} on {}:{} has finished loading", workerId, topologyId, assignmentId, port);
+ return this;
+ }
- LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
- LOG.info("Worker {} for storm {} on {}:{} has finished loading", workerId, topologyId, assignmentId, port);
- return this;
- };
+ private void setupFlushTupleTimer(final Map<String, Object> topologyConf, final List<IRunningExecutor> executors) {
+ final Integer producerBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
+ final Integer xferBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_TRANSFER_BATCH_SIZE));
+ final Long flushIntervalMillis = ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BATCH_FLUSH_INTERVAL_MILLIS));
+ if ((producerBatchSize == 1 && xferBatchSize == 1) || flushIntervalMillis == 0) {
+ LOG.info("Flush Tuple generation disabled. producerBatchSize={}, xferBatchSize={}, flushIntervalMillis={}", producerBatchSize, xferBatchSize, flushIntervalMillis);
+ return;
+ }
+
+ workerState.flushTupleTimer.scheduleRecurringMs(flushIntervalMillis, flushIntervalMillis, new Runnable() {
+ @Override
+ public void run() {
+ // send flush tuple to all executors
+ for (int i = 0; i < executors.size(); i++) {
+ IRunningExecutor exec = executors.get(i);
+ if (exec.getExecutorId().get(0) != Constants.SYSTEM_TASK_ID) {
+ exec.getExecutor().publishFlushTuple();
+ }
+ }
+ }
});
+ LOG.info("Flush tuple will be generated every {} millis", flushIntervalMillis);
+ }
+ private void setupBackPressureCheckTimer(final Map<String, Object> topologyConf) {
+ final Integer workerCount = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_WORKERS));
+ if (workerCount <= 1) {
+ LOG.info("BackPressure change checking is disabled as there is only one worker");
+ return;
+ }
+ final Long bpCheckIntervalMs = ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_CHECK_MILLIS));
+ workerState.backPressureCheckTimer.scheduleRecurringMs(bpCheckIntervalMs, bpCheckIntervalMs, new Runnable() {
--- End diff --
nit: Could we use java 8 lambdas here?
```
workerState.backPressureCheckTimer.scheduleRecurringMs(bpCheckIntervalMs, bpCheckIntervalMs,
() -> workerState.refreshBackPressureStatus());
```
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159955584
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java ---
@@ -22,24 +22,25 @@
public class SpoutThrottlingMetrics extends BuiltinMetrics {
private final CountMetric skippedMaxSpoutMs = new CountMetric();
- private final CountMetric skippedThrottleMs = new CountMetric();
private final CountMetric skippedInactiveMs = new CountMetric();
+ private final CountMetric skippedBackPressureMs = new CountMetric();
public SpoutThrottlingMetrics() {
metricMap.put("skipped-max-spout-ms", skippedMaxSpoutMs);
- metricMap.put("skipped-throttle-ms", skippedThrottleMs);
metricMap.put("skipped-inactive-ms", skippedInactiveMs);
+ metricMap.put("skipped-backpressure-ms", skippedBackPressureMs);
--- End diff --
./docs/Metrics.md describes these. It should be updated to remove skipped-throttle-ms and replaced with skipped-backpressure-ms (and preferably mention that in older versions of storm skipped-throttle-ms would have been similar)
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r161359446
--- Diff: storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java ---
@@ -23,7 +23,7 @@
import java.io.Serializable;
import java.util.List;
-public class TupleInfo implements Serializable {
+public final class TupleInfo implements Serializable {
--- End diff --
I think i was wondering if marking some of the frequently allocated and used objects as final impacts perf. Not necessary, I will revert it.
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:
https://github.com/apache/storm/pull/2502
@roshannaik I may have checked it out wrong, I will try again...
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:
https://github.com/apache/storm/pull/2502
Commits Quashed. Thanks. Will use follow up jiras to address any issues discovered.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167328981
--- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
@@ -44,14 +48,15 @@
private final Boolean isEventLoggers;
private final Boolean isDebug;
private final RotatingMap<Long, TupleInfo> pending;
+ private TupleInfo globalTupleInfo = new TupleInfo(); // thread safety: assumes Collector.emit*() calls are externally synchronized (if needed).
--- End diff --
This feels like a good assumption for a spout, but I would like to understand the cost of making this thread safe (thread local instance etc), and at least document it if that cost is high, or preferably find a low cost solution to throw an exception if it does happen.
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:
https://github.com/apache/storm/pull/2502
I am still seeing test failures when running `mvn clean install -Pall-tests -fn | tee log.txt`
`integration.org.apache.storm.integration-test` Looks like an NPE at ExecutorTransfer.java:114 appears to be an issue with initializing the local receive queues.
`storm-starter` is also getting several errors. `test-intermediate-rankings-bolt` is getting an array index out of bounds exception using `RankableObjectWithFields`
`test-exclamation-bolt`, `test-total-rankings-bolt` and `test-rolling-bolt` are getting an NPE at GeneralTopologyContext.java:112 which looks like the component to stream table is not being setup properly, at least for the clojure API.
I'll see if there is more that I can come up with, and I'll try to spend more time looking at the actual code changes.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r168015462
--- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus {
+ public static final short IDENTIFIER = (short)-600;
+ private static final int SIZE_OF_ID = 2; // size if IDENTIFIER
+ private static final int SIZE_OF_INT = 4;
+
+ private static AtomicLong bpCount = new AtomicLong(0);
+
+ public String workerId;
+ public final long id; // monotonically increasing id
--- End diff --
Sorry to leave two duplicated comments: I commented to old commit.
Just wondering: the characteristic of `monotonically increasing` guarantee `id` to be unique in a worker, but not among workers, and also reset to 0 after worker crash and restart. Does it hurt the backpressure logic at any chance?
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167285848
--- Diff: flux/flux-core/src/test/java/org/apache/storm/flux/multilang/MultilangEnvironmentTest.java ---
@@ -18,6 +18,7 @@
package org.apache.storm.flux.multilang;
+import org.junit.Ignore;
--- End diff --
nit: I don't think this is used in here, so can we remove it?
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r168032846
--- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
@@ -22,19 +22,23 @@
import org.apache.storm.executor.TupleInfo;
import org.apache.storm.spout.ISpout;
import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.MessageId;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
import org.apache.storm.utils.MutableLong;
import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally
--- End diff --
nit: better to make it as javadoc so that it can be exposed to more ways.
As @revans2 stated, I also think this is a good assumption for a spout, but even better to update the restriction if we have documented any. That's just a 2 cents, not blocker.
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:
https://github.com/apache/storm/pull/2502
@revans2 , UTs for the storm-client module were fixed (including JCQueueTest). They run cleanly for me. Are you seeing them fail in your local setup ?
**Rebasing:** Due to the length of the review cycle this PR needed and the need for a stable point for perf runs, I avoided keeping up to date with latest master, so far. Planning to address all review comments first. There are around 300 commits on master to catch up on right now. Will take me a week I think to rebase.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159968476
--- Diff: storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java ---
@@ -76,6 +76,32 @@ public static Integer getInt(Object o, Integer defaultValue) {
throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
}
+ public static Long getLong(Object o) {
+ return getLong(o, null);
+ }
+
+ public static Long getLong(Object o, Long defaultValue) {
+ if (null == o) {
+ return defaultValue;
+ }
+
+ if ( o instanceof Long ||
+ o instanceof Integer ||
+ o instanceof Short ||
+ o instanceof Byte) {
+ return ((Number) o).longValue();
+ } else if (o instanceof Double) {
+ final long l = (Long) o;
--- End diff --
This is not going to work here as a Long can never be larger then MAX_VALUE for a long or smaller then MIN_VALUE for a Long. Technically I think there may be a bug in getInt if the number is larger then a Long can hold.
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:
https://github.com/apache/storm/pull/2502
@revans2 and @HeartSaVioR
Just rebased this PR and also included the fix for the making ExecutorTransfer.tryTransfer() thread safe to allow concurrent emits from background threads spun by the Bolt/Spout executors.
I am hoping we can revisit this topic of how to allow concurrent emits without making it internally thread safe, after this PR is merged.
This fix should unblock this PR and allow us to discuss the issue separately.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r168057328
--- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
@@ -22,19 +22,23 @@
import org.apache.storm.executor.TupleInfo;
import org.apache.storm.spout.ISpout;
import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.MessageId;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
import org.apache.storm.utils.MutableLong;
import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally
--- End diff --
Done.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159959648
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -196,19 +197,21 @@ public static Executor mkExecutor(WorkerState workerState, List<Long> executorId
executor.stats = new BoltExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()),ObjectReader.getInt(executor.getStormConf().get(Config.NUM_STAT_BUCKETS)));
}
+ int minId = Integer.MAX_VALUE;
Map<Integer, Task> idToTask = new HashMap<>();
for (Integer taskId : taskIds) {
+ minId = Math.min(minId, taskId);
try {
Task task = new Task(executor, taskId);
- executor.sendUnanchored(
- task, StormCommon.SYSTEM_STREAM_ID, new Values("startup"), executor.getExecutorTransfer());
+ task.sendUnanchored( StormCommon.SYSTEM_STREAM_ID, new Values("startup"), executor.getExecutorTransfer(), null); // TODO: Roshan: does this get delivered/handled anywhere ?
--- End diff --
Answer: storm-core/test/clj/integration/org/apache/storm/integration_test.clj
It can sometimes be helpful with debugging when you have topology.debug enabled, but it is not that critical and can probably be removed if you want to. If not please remove the comment.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159954152
--- Diff: storm-client/src/jvm/org/apache/storm/StormTimer.java ---
@@ -193,6 +210,24 @@ public void run() {
});
}
+ /**
+ * Schedule a function to run recurrently
+ * @param delayMs the number of millis to delay before running the function
+ * @param recurMs the time between each invocation
+ * @param func the function to run
+ */
+ public void scheduleRecurringMs(long delayMs, final long recurMs, final Runnable func) {
+ scheduleMs(delayMs, new Runnable() {
--- End diff --
nit could we use a java 8 lambda here instead?
```
scheduleMs(delayMs, () -> {
func.run();
// This avoids a race condition with cancel-timer.
scheduleMs(recurMs, this, true, 0);
});
```
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159967636
--- Diff: storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java ---
@@ -24,50 +24,46 @@
import org.apache.storm.task.GeneralTopologyContext;
public class TupleImpl implements Tuple {
- private final List<Object> values;
- private final int taskId;
- private final String streamId;
- private final GeneralTopologyContext context;
- private final MessageId id;
+ private List<Object> values;
+ private int taskId;
+ private String streamId;
+ private GeneralTopologyContext context;
+ private MessageId id;
+ private final String srcComponent;
private Long _processSampleStartTime;
private Long _executeSampleStartTime;
private long _outAckVal = 0;
-
+
public TupleImpl(Tuple t) {
this.values = t.getValues();
this.taskId = t.getSourceTask();
this.streamId = t.getSourceStreamId();
this.id = t.getMessageId();
this.context = t.getContext();
- if (t instanceof TupleImpl) {
+ this.srcComponent = t.getSourceComponent();
+ try {
TupleImpl ti = (TupleImpl) t;
this._processSampleStartTime = ti._processSampleStartTime;
this._executeSampleStartTime = ti._executeSampleStartTime;
this._outAckVal = ti._outAckVal;
+ } catch (ClassCastException e) {
+ // ignore ... if t is not a TupleImpl type .. faster than checking and then casting
}
}
- public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
+ public TupleImpl(GeneralTopologyContext context, List<Object> values, String srcComponent, int taskId, String streamId, MessageId id) {
this.values = Collections.unmodifiableList(values);
this.taskId = taskId;
this.streamId = streamId;
this.id = id;
this.context = context;
-
- String componentId = context.getComponentId(taskId);
- Fields schema = context.getComponentOutputFields(componentId, streamId);
- if(values.size()!=schema.size()) {
- throw new IllegalArgumentException(
- "Tuple created with wrong number of fields. " +
- "Expected " + schema.size() + " fields but got " +
- values.size() + " fields");
- }
--- End diff --
I know this slows things down, but it is a good sanity check. Would it be possible to have a way to configure this on? Like in local mode? We would have to cache it though because just checking the conf might be as long as doing this check.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159969910
--- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java ---
@@ -79,6 +79,7 @@
protected final long lowMemoryThresholdMB;
protected final long mediumMemoryThresholdMb;
protected final long mediumMemoryGracePeriodMs;
+ private static int port = 5006; // TODO: Roshan: remove this after stabilization
--- End diff --
Yes please make sure this is removed, as leaving it in place is a security vulnerability.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167326878
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -313,107 +330,74 @@ public void metricsTick(Task taskData, TupleImpl tuple) {
protected void setupMetrics() {
for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) {
StormTimer timerTask = workerData.getUserTimer();
- timerTask.scheduleRecurring(interval, interval, new Runnable() {
- @Override
- public void run() {
- TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(interval),
- (int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID);
- List<AddressedTuple> metricsTickTuple =
- Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
- receiveQueue.publish(metricsTickTuple);
+ timerTask.scheduleRecurring(interval, interval,
+ () -> {
+ TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(interval), Constants.SYSTEM_COMPONENT_ID,
+ (int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID);
+ AddressedTuple metricsTickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
+ try {
+ receiveQueue.publish(metricsTickTuple);
+ receiveQueue.flush(); // avoid buffering
+ } catch (InterruptedException e) {
+ LOG.warn("Thread interrupted when publishing metrics. Setting interrupt flag.");
+ Thread.currentThread().interrupt();
+ return;
+ }
}
- });
- }
- }
-
- public void sendUnanchored(Task task, String stream, List<Object> values, ExecutorTransfer transfer) {
- Tuple tuple = task.getTuple(stream, values);
- List<Integer> tasks = task.getOutgoingTasks(stream, values);
- for (Integer t : tasks) {
- transfer.transfer(t, tuple);
- }
- }
-
- /**
- * Send sampled data to the eventlogger if the global or component level debug flag is set (via nimbus api).
- */
- public void sendToEventLogger(Executor executor, Task taskData, List values,
- String componentId, Object messageId, Random random) {
- Map<String, DebugOptions> componentDebug = executor.getStormComponentDebug().get();
- DebugOptions debugOptions = componentDebug.get(componentId);
- if (debugOptions == null) {
- debugOptions = componentDebug.get(executor.getStormId());
- }
- double spct = ((debugOptions != null) && (debugOptions.is_enable())) ? debugOptions.get_samplingpct() : 0;
- if (spct > 0 && (random.nextDouble() * 100) < spct) {
- sendUnanchored(taskData, StormCommon.EVENTLOGGER_STREAM_ID,
- new Values(componentId, messageId, System.currentTimeMillis(), values),
- executor.getExecutorTransfer());
+ );
}
}
- public void reflectNewLoadMapping(LoadMapping loadMapping) {
- for (LoadAwareCustomStreamGrouping g : groupers) {
- g.refreshLoad(loadMapping);
- }
- }
-
- private void registerBackpressure() {
- receiveQueue.registerBackpressureCallback(new DisruptorBackpressureCallback() {
- @Override
- public void highWaterMark() throws Exception {
- LOG.debug("executor " + executorId + " is congested, set backpressure flag true");
- WorkerBackpressureThread.notifyBackpressureChecker(workerData.getBackpressureTrigger());
- }
-
- @Override
- public void lowWaterMark() throws Exception {
- LOG.debug("executor " + executorId + " is not-congested, set backpressure flag false");
- WorkerBackpressureThread.notifyBackpressureChecker(workerData.getBackpressureTrigger());
- }
- });
- receiveQueue.setHighWaterMark(ObjectReader.getDouble(topoConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
- receiveQueue.setLowWaterMark(ObjectReader.getDouble(topoConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
- receiveQueue.setEnableBackpressure(ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false));
- }
-
protected void setupTicks(boolean isSpout) {
final Integer tickTimeSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null);
- boolean enableMessageTimeout = (Boolean) topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
if (tickTimeSecs != null) {
+ boolean enableMessageTimeout = (Boolean) topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
if ((!Acker.ACKER_COMPONENT_ID.equals(componentId) && Utils.isSystemId(componentId))
|| (!enableMessageTimeout && isSpout)) {
- LOG.info("Timeouts disabled for executor {}:{}", componentId, executorId);
+ LOG.info("Timeouts disabled for executor " + componentId + ":" + executorId);
--- End diff --
nit: why did we go back to String concatenation?
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:
https://github.com/apache/storm/pull/2502
I want to clarify that my performance tests on the latest code were fairly simple. I did more exhaustive performance tests on an older version of the code that looked good to me, but I didn't save any of the numbers.
I am planning to merge this in shortly once I verify all of the unit tests still pass, but I think we should see if we can reproduce the increased CPU utilization that @HeartSaVioR saw in a follow on JIRA.
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:
https://github.com/apache/storm/pull/2502
I suspect this issue won’t show up on the setups I am using and will need to be triaged on your setup itself. Will work with you offline on that.
BTW what version of Java are you using ?
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r161356532
--- Diff: docs/Performance.md ---
@@ -0,0 +1,132 @@
+---
--- End diff --
I am putting a link to this doc from Concepts.md. If you have better place in mind please let me know.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r161358493
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
@@ -155,134 +150,159 @@ public void start() throws Exception {
Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
@Override public Object run() throws Exception {
- workerState =
- new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
+ return loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials);
+ }
+ }); // Subject.doAs(...)
+
+ }
+
+ private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState, Map<String, String> initCreds, Credentials initialCredentials)
+ throws Exception {
+ workerState =
+ new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
stormClusterState);
- // Heartbeat here so that worker process dies if this fails
- // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
- // that worker is running and moves on
- doHeartBeat();
+ // Heartbeat here so that worker process dies if this fails
+ // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
+ // that worker is running and moves on
+ doHeartBeat();
- executorsAtom = new AtomicReference<>(null);
+ executorsAtom = new AtomicReference<>(null);
- // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
- // to the supervisor
- workerState.heartbeatTimer
- .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
- try {
- doHeartBeat();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
+ // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
+ // to the supervisor
+ workerState.heartbeatTimer
+ .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
+ try {
+ doHeartBeat();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
- workerState.executorHeartbeatTimer
- .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
+ workerState.executorHeartbeatTimer
+ .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
Worker.this::doExecutorHeartbeats);
- workerState.registerCallbacks();
+ workerState.registerCallbacks();
- workerState.refreshConnections(null);
+ workerState.refreshConnections(null);
- workerState.activateWorkerWhenAllConnectionsReady();
+ workerState.activateWorkerWhenAllConnectionsReady();
- workerState.refreshStormActive(null);
+ workerState.refreshStormActive(null);
- workerState.runWorkerStartHooks();
+ workerState.runWorkerStartHooks();
- List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
- for (List<Long> e : workerState.getExecutors()) {
- if (ConfigUtils.isLocalMode(topologyConf)) {
- newExecutors.add(
- LocalExecutor.mkExecutor(workerState, e, initCreds)
- .execute());
- } else {
- newExecutors.add(
- Executor.mkExecutor(workerState, e, initCreds)
- .execute());
- }
- }
- executorsAtom.set(newExecutors);
+ List<Executor> execs = new ArrayList<>();
+ for (List<Long> e : workerState.getExecutors()) {
+ if (ConfigUtils.isLocalMode(topologyConf)) {
+ Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
+ execs.add( executor );
+ workerState.localReceiveQueues.put(executor.getTaskIds().get(0), executor.getReceiveQueue());
+ } else {
+ Executor executor = Executor.mkExecutor(workerState, e, initCreds);
+ workerState.localReceiveQueues.put(executor.getTaskIds().get(0), executor.getReceiveQueue());
+ execs.add(executor);
+ }
+ }
- EventHandler<Object> tupleHandler = (packets, seqId, batchEnd) -> workerState
- .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>) packets, seqId, batchEnd);
+ List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
+ for (Executor executor : execs) {
+ newExecutors.add(executor.execute());
+ }
+ executorsAtom.set(newExecutors);
- // This thread will publish the messages destined for remote tasks to remote connections
- transferThread = Utils.asyncLoop(() -> {
- workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
- return 0L;
- });
+ // This thread will send out messages destined for remote tasks (on other workers)
+ transferThread = workerState.makeTransferThread();
+ transferThread.setName("Worker-Transfer");
- DisruptorBackpressureCallback disruptorBackpressureHandler =
- mkDisruptorBackpressureHandler(workerState);
- workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
- workerState.transferQueue
- .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
- workerState.transferQueue
- .setHighWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
- workerState.transferQueue
- .setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
-
- WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler();
- backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
- if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
- backpressureThread.start();
- stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle);
-
- int pollingSecs = ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
- workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, workerState::refreshThrottle);
- }
+ credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
- credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
+ establishLogSettingCallback();
- establishLogSettingCallback();
+ workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
- workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
+ workerState.refreshCredentialsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
+ @Override public void run() {
+ checkCredentialsChanged();
+ }
+ });
- workerState.refreshCredentialsTimer.scheduleRecurring(0,
- (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
- @Override public void run() {
- checkCredentialsChanged();
- if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
- checkThrottleChanged();
- }
+ workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
+ (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new Runnable() {
+ @Override public void run() {
+ try {
+ LOG.debug("Checking if blobs have updated");
+ updateBlobUpdates();
+ } catch (IOException e) {
+ // IOException from reading the version files to be ignored
+ LOG.error(e.getStackTrace().toString());
}
- });
-
- workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
- (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new Runnable() {
- @Override public void run() {
- try {
- LOG.debug("Checking if blobs have updated");
- updateBlobUpdates();
- } catch (IOException e) {
- // IOException from reading the version files to be ignored
- LOG.error(e.getStackTrace().toString());
- }
- }
- });
-
- // The jitter allows the clients to get the data at different times, and avoids thundering herd
- if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
- workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
- }
+ }
+ });
+
+ // The jitter allows the clients to get the data at different times, and avoids thundering herd
+ if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
+ workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
+ }
+
+ workerState.refreshConnectionsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
- workerState.refreshConnectionsTimer.scheduleRecurring(0,
- (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
+ workerState.resetLogLevelsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
- workerState.resetLogLevelsTimer.scheduleRecurring(0,
- (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
+ workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
+ workerState::refreshStormActive);
- workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
- workerState::refreshStormActive);
+ setupFlushTupleTimer(topologyConf, newExecutors);
+ setupBackPressureCheckTimer(topologyConf);
+
+ LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
+ LOG.info("Worker {} for storm {} on {}:{} has finished loading", workerId, topologyId, assignmentId, port);
+ return this;
+ }
- LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
- LOG.info("Worker {} for storm {} on {}:{} has finished loading", workerId, topologyId, assignmentId, port);
- return this;
- };
+ private void setupFlushTupleTimer(final Map<String, Object> topologyConf, final List<IRunningExecutor> executors) {
+ final Integer producerBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
+ final Integer xferBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_TRANSFER_BATCH_SIZE));
+ final Long flushIntervalMillis = ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BATCH_FLUSH_INTERVAL_MILLIS));
+ if ((producerBatchSize == 1 && xferBatchSize == 1) || flushIntervalMillis == 0) {
+ LOG.info("Flush Tuple generation disabled. producerBatchSize={}, xferBatchSize={}, flushIntervalMillis={}", producerBatchSize, xferBatchSize, flushIntervalMillis);
+ return;
+ }
+
+ workerState.flushTupleTimer.scheduleRecurringMs(flushIntervalMillis, flushIntervalMillis, new Runnable() {
+ @Override
+ public void run() {
+ // send flush tuple to all executors
+ for (int i = 0; i < executors.size(); i++) {
+ IRunningExecutor exec = executors.get(i);
+ if (exec.getExecutorId().get(0) != Constants.SYSTEM_TASK_ID) {
+ exec.getExecutor().publishFlushTuple();
+ }
+ }
+ }
});
+ LOG.info("Flush tuple will be generated every {} millis", flushIntervalMillis);
+ }
+ private void setupBackPressureCheckTimer(final Map<String, Object> topologyConf) {
+ final Integer workerCount = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_WORKERS));
+ if (workerCount <= 1) {
+ LOG.info("BackPressure change checking is disabled as there is only one worker");
+ return;
+ }
+ final Long bpCheckIntervalMs = ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_CHECK_MILLIS));
+ workerState.backPressureCheckTimer.scheduleRecurringMs(bpCheckIntervalMs, bpCheckIntervalMs, new Runnable() {
--- End diff --
fixed
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r168046999
--- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
@@ -22,19 +22,23 @@
import org.apache.storm.executor.TupleInfo;
import org.apache.storm.spout.ISpout;
import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.MessageId;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
import org.apache.storm.utils.MutableLong;
import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally
--- End diff --
yes please thats the intent.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167287393
--- Diff: storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---
@@ -40,109 +40,117 @@
import org.apache.storm.nimbus.NimbusInfo;
public interface IStormClusterState {
- public List<String> assignments(Runnable callback);
+ List<String> assignments(Runnable callback);
- public Assignment assignmentInfo(String stormId, Runnable callback);
+ Assignment assignmentInfo(String stormId, Runnable callback);
- public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
+ VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
- public Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
+ Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
- public List<String> blobstoreInfo(String blobKey);
+ List<String> blobstoreInfo(String blobKey);
- public List<NimbusSummary> nimbuses();
+ List<NimbusSummary> nimbuses();
- public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
+ void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
- public List<String> activeStorms();
+ List<String> activeStorms();
/**
* Get a storm base for a topology
* @param stormId the id of the topology
* @param callback something to call if the data changes (best effort)
* @return the StormBase or null if it is not alive.
*/
- public StormBase stormBase(String stormId, Runnable callback);
+ StormBase stormBase(String stormId, Runnable callback);
- public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
+ ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
- public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
+ List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
- public List<ProfileRequest> getTopologyProfileRequests(String stormId);
+ List<ProfileRequest> getTopologyProfileRequests(String stormId);
- public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);
+ void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);
- public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
+ void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
- public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
+ Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
- public List<String> supervisors(Runnable callback);
+ List<String> supervisors(Runnable callback);
- public SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
+ SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
- public void setupHeatbeats(String stormId);
+ void setupHeatbeats(String stormId);
- public void teardownHeartbeats(String stormId);
+ void teardownHeartbeats(String stormId);
- public void teardownTopologyErrors(String stormId);
+ void teardownTopologyErrors(String stormId);
- public List<String> heartbeatStorms();
+ List<String> heartbeatStorms();
- public List<String> errorTopologies();
+ List<String> errorTopologies();
- public List<String> backpressureTopologies();
+ /** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon. */
--- End diff --
Could you file a JIRA for us to remove it in 3.x?
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159954450
--- Diff: storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java ---
@@ -47,7 +47,6 @@
public static final String STORMS_ROOT = "storms";
public static final String SUPERVISORS_ROOT = "supervisors";
public static final String WORKERBEATS_ROOT = "workerbeats";
- public static final String BACKPRESSURE_ROOT = "backpressure";
--- End diff --
In order to support running older topology versions under a newer 2.x nimbus/etc we should still keep around the basic setup and cleanup of the backpressure nodes in zookeeper at least until a 3.x release.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r161357014
--- Diff: storm-client/src/jvm/org/apache/storm/StormTimer.java ---
@@ -193,6 +210,24 @@ public void run() {
});
}
+ /**
+ * Schedule a function to run recurrently
+ * @param delayMs the number of millis to delay before running the function
+ * @param recurMs the time between each invocation
+ * @param func the function to run
+ */
+ public void scheduleRecurringMs(long delayMs, final long recurMs, final Runnable func) {
+ scheduleMs(delayMs, new Runnable() {
--- End diff --
Doesn't seem feasible due to the 'this' reference inside the lambda.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r168026607
--- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus {
+ public static final short IDENTIFIER = (short)-600;
+ private static final int SIZE_OF_ID = 2; // size if IDENTIFIER
+ private static final int SIZE_OF_INT = 4;
+
+ private static AtomicLong bpCount = new AtomicLong(0);
+
+ public String workerId;
+ public final long id; // monotonically increasing id
--- End diff --
Its only for debugging purposes.. so that we can co-relate sent & recvd msgs. I have used it to measure latency involved in transmission of BackPressureStatus msgs.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167387609
--- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus implements java.io.Serializable {
--- End diff --
https://github.com/apache/storm/blob/aaebc3b237916340156ac3b8dc956d6c62c34983/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java#L66-L74
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:
https://github.com/apache/storm/pull/2502
Here's my test result: TVL with rate 85000 and max spout pending 5000. spouts/splitters/counters are set to same as worker count.
Again, this is just to see if there's clear performance regression from TVL which we have been using. Full analysis like [google doc in STORM-2306](https://docs.google.com/document/d/1A5k41UjVFY8jZg01BHc1fFxmI0AcFz5jRiKhNjhOj7I/edit?usp=sharing) would require so much efforts and resources (dedicated machines). If we could update the numbers with latest master vs latest patch of STORM-2306 it would be really great, but we should be OK if we postpone measuring numbers just before releasing Storm 2.0.0 comparing with latest Storm 1.x release.
> 4 workers
>> master (ab7b4ca)
1.
```
(s) rate(tuple/s) mean(ms) 99%ile(ms) 99.9%ile(ms) cores mem(MB) failed
0 56,660.500 1,482.288 3,808.428 3,942.646 6.554 329.044 0
30 82,245.581 10.817 27.607 50.430 5.093 364.626 0
61 85,022.500 10.495 21.316 36.405 5.192 242.701 0
91 84,994.733 10.365 19.186 29.688 5.152 399.110 0
121 85,008.333 10.385 19.595 29.016 5.195 269.351 0
151 84,999.867 10.367 19.005 29.000 5.156 361.585 0
181 85,006.133 10.361 18.285 24.953 5.215 319.568 0
211 85,007.433 10.373 18.547 26.018 5.227 374.250 0
241 84,978.300 10.390 19.153 29.852 5.107 240.743 0
271 85,025.733 10.351 18.366 28.934 5.134 308.930 0
```
2.
```
(s) rate(tuple/s) mean(ms) 99%ile(ms) 99.9%ile(ms) cores mem(MB) failed
0 24,746.933 2,497.621 3,498.050 3,571.450 4.175 480.104 0
30 88,589.100 160.265 1,622.147 1,778.385 5.883 329.437 0
60 85,012.833 10.494 20.120 31.326 5.211 421.895 0
90 84,993.867 10.394 18.956 26.132 5.185 434.169 0
120 85,008.100 10.474 20.201 31.523 5.194 440.354 0
150 85,001.067 10.396 18.776 28.574 5.238 449.158 0
180 85,005.633 10.477 20.283 35.127 5.289 441.979 0
210 84,994.833 10.390 18.678 29.295 5.136 419.486 0
240 85,011.167 10.370 18.219 26.051 5.219 308.977 0
270 85,007.600 10.438 19.317 27.804 5.158 403.509 0
```
3.
```
(s) rate(tuple/s) mean(ms) 99%ile(ms) 99.9%ile(ms) cores mem(MB) failed
0 56,648.833 1,368.022 3,565.158 3,682.599 6.476 405.396 0
30 85,045.067 10.725 23.446 44.007 5.249 448.851 0
60 85,002.400 10.527 19.874 28.901 5.209 452.087 0
90 84,997.800 10.412 19.694 34.341 5.136 356.300 0
120 84,995.033 10.451 20.267 29.802 5.174 379.923 0
150 85,019.533 10.413 19.153 30.556 5.193 398.516 0
180 85,001.367 10.371 18.448 25.182 5.191 331.490 0
210 85,004.400 10.386 18.874 24.101 5.195 368.889 0
240 84,991.367 10.368 18.416 25.018 5.132 392.701 0
270 85,004.367 10.469 19.808 35.586 5.147 443.393 0
```
>> STORM-2306 (5b54809)
1.
```
(s) rate(tuple/s) mean(ms) 99%ile(ms) 99.9%ile(ms) cores mem(MB) failed
0 28,098.000 1,549.653 3,183.477 3,227.517 4.041 192.208 0
30 85,306.133 13.650 270.795 406.585 8.958 275.171 0
60 85,007.067 5.466 17.449 28.836 9.023 287.177 0
90 92,084.333 5.483 17.269 26.280 9.004 212.465 0
120 84,999.400 5.561 17.662 29.458 9.739 368.272 0
150 85,005.367 5.593 17.449 37.126 8.979 293.334 0
180 92,082.733 5.584 18.219 28.787 8.981 334.805 0
210 85,008.000 5.571 18.022 29.213 8.962 363.184 0
240 84,992.867 5.604 17.662 39.191 9.020 271.483 0
270 92,094.467 5.590 17.351 24.953 8.939 320.613 0
```
2.
```
(s) rate(tuple/s) mean(ms) 99%ile(ms) 99.9%ile(ms) cores mem(MB) failed
0 56,779.467 728.445 2,967.470 3,066.036 7.032 308.975 0
30 84,986.933 5.851 21.037 33.489 9.101 404.124 0
60 85,021.833 5.593 18.301 29.049 9.104 377.985 0
90 85,000.500 5.577 17.465 26.853 9.101 343.799 0
120 85,009.300 5.714 18.661 29.196 9.084 252.083 0
150 84,998.533 5.532 16.957 26.460 9.114 252.234 0
180 85,005.300 5.588 16.810 26.165 9.102 227.976 0
210 84,997.633 5.708 19.055 32.096 9.157 294.213 0
240 84,994.433 5.692 18.481 30.261 9.046 365.903 0
270 85,015.500 5.590 18.022 28.197 9.080 401.871 0
```
3.
```
(s) rate(tuple/s) mean(ms) 99%ile(ms) 99.9%ile(ms) cores mem(MB) failed
0 34,802.167 1,451.699 3,227.517 3,275.751 4.887 358.051 0
30 99,803.667 26.173 740.295 880.804 9.226 349.733 0
60 92,079.433 5.790 20.398 34.800 11.521 254.316 0
90 85,002.100 5.590 17.220 27.361 9.218 333.207 0
120 84,998.033 5.750 18.432 28.066 9.179 325.581 0
150 84,993.500 5.745 18.792 29.966 9.190 323.914 0
180 85,011.500 5.828 17.269 25.444 9.217 352.495 0
210 85,005.200 5.761 18.514 32.899 9.244 338.353 0
240 84,983.900 5.756 20.546 33.374 9.165 346.788 0
270 85,018.133 5.516 17.138 32.915 9.206 404.615 0
```
> 1 worker
>> master (ab7b4ca)
1.
```
(s) rate(tuple/s) mean(ms) 99%ile(ms) 99.9%ile(ms) cores mem(MB) failed
0 56,667.633 96.401 944.767 972.030 2.220 115.121 0
30 85,012.833 6.684 21.283 27.427 2.616 27.433 0
60 85,003.867 6.715 22.315 28.967 2.600 48.685 0
90 85,009.633 7.003 24.855 30.261 2.650 56.965 0
120 85,008.167 6.765 22.331 35.750 2.651 75.872 0
150 84,997.767 6.788 22.430 31.998 2.597 99.166 0
180 85,004.633 6.658 21.905 26.755 2.574 123.478 0
210 85,011.633 6.661 21.070 26.460 2.573 34.217 0
240 84,961.500 6.658 21.955 29.606 2.593 38.743 0
270 85,062.533 6.715 22.790 28.492 2.597 76.703 0
```
2.
```
(s) rate(tuple/s) mean(ms) 99%ile(ms) 99.9%ile(ms) cores mem(MB) failed
0 56,666.067 100.076 956.301 968.884 2.289 90.811 0
30 84,985.100 6.886 24.461 30.654 2.644 43.144 0
60 85,033.667 6.738 21.725 30.786 2.613 44.536 0
90 85,006.833 6.827 24.183 28.361 2.636 123.622 0
120 85,003.100 6.767 21.807 27.967 2.700 111.695 0
150 85,007.800 6.564 21.365 33.194 2.574 104.018 0
180 85,004.067 7.089 24.527 29.688 2.669 80.106 0
210 85,009.400 6.673 21.348 27.853 2.617 53.960 0
240 84,999.367 7.159 24.216 29.131 2.668 134.561 0
270 85,064.100 7.168 24.773 28.393 2.670 115.301 0
```
3.
```
(s) rate(tuple/s) mean(ms) 99%ile(ms) 99.9%ile(ms) cores mem(MB) failed
0 56,651.067 104.108 983.040 999.293 2.245 94.063 0
30 82,287.774 7.086 23.364 43.221 2.574 106.583 0
61 85,002.000 6.953 23.396 29.213 2.642 113.673 0
91 85,013.500 6.913 22.839 29.393 2.628 129.253 0
121 85,008.667 6.775 22.348 31.982 2.650 153.746 0
151 85,003.467 6.576 19.956 27.017 2.560 56.378 0
181 85,015.300 6.837 22.692 31.408 2.607 79.616 0
211 85,004.100 6.879 25.346 31.097 2.581 94.896 0
241 85,005.867 6.891 23.609 29.278 2.594 116.510 0
271 85,010.800 6.760 21.987 28.541 2.575 125.168 0
```
>> STORM-2306 (5b54809)
1.
```
(s) rate(tuple/s) mean(ms) 99%ile(ms) 99.9%ile(ms) cores mem(MB) failed
0 56,666.333 18.022 330.564 346.030 2.079 46.405 0
30 85,017.400 3.748 20.791 24.805 2.686 107.791 0
60 85,020.833 3.466 20.120 24.068 2.679 46.481 0
90 85,009.833 3.470 19.988 23.118 2.639 63.486 0
120 85,007.600 3.676 21.250 26.542 2.708 76.022 0
150 85,006.733 3.395 20.283 23.904 2.636 80.490 0
180 85,009.333 3.632 21.086 26.231 2.657 78.360 0
210 85,003.300 3.643 20.840 26.362 2.670 87.437 0
240 85,007.733 3.735 21.545 25.788 2.686 98.075 0
270 85,008.000 3.675 21.430 29.606 2.647 93.264 0
```
2.
```
(s) rate(tuple/s) mean(ms) 99%ile(ms) 99.9%ile(ms) cores mem(MB) failed
0 56,684.100 15.841 314.835 371.196 2.051 52.854 0
30 84,998.900 3.343 20.464 24.134 2.624 95.248 0
60 85,014.100 3.600 21.299 24.363 2.665 43.992 0
90 85,000.533 3.539 20.726 23.839 2.650 100.354 0
120 85,006.567 3.483 20.251 23.347 2.665 40.798 0
150 84,992.467 3.562 20.791 26.001 2.666 102.680 0
180 82,297.839 3.655 20.890 27.378 2.590 51.539 0
211 85,007.333 3.567 21.299 26.640 2.650 107.193 0
241 85,012.467 3.604 20.955 27.460 2.646 50.419 0
271 85,001.333 3.288 19.726 24.543 2.604 103.526 0
```
3.
```
(s) rate(tuple/s) mean(ms) 99%ile(ms) 99.9%ile(ms) cores mem(MB) failed
0 56,680.200 14.695 299.893 340.263 2.096 100.239 0
30 85,012.833 4.007 21.512 27.427 2.719 67.345 0
60 85,003.900 3.921 22.233 31.228 2.709 43.670 0
90 85,009.467 3.881 22.249 26.722 2.683 23.378 0
120 85,027.600 3.955 21.840 25.772 2.717 108.810 0
150 85,009.300 4.049 22.479 36.405 2.724 80.150 0
180 85,005.933 3.381 20.283 23.478 2.639 43.224 0
210 85,007.733 3.798 21.053 24.199 2.717 115.814 0
240 85,007.800 4.275 22.266 26.608 2.751 106.709 0
270 85,005.867 3.507 20.644 24.297 2.700 59.038 0
```
In overall, this patch shows half of mean latency, whereas 99%ile/99.9%ile of latencies are close to. It also shows this patch consumes more CPU. The gap is fairly small for single-worker but becomes somewhat huge on 4 workers. I might be wrong about testing so it would be really nice if someone also runs the multi-workers test and shares the result.
Even my test result is not wrong I'm +1, because this patch introduces better design of backpressure which should have been enabled by default but we disabled. We could file some follow-up issues if there're something we should address.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r161358991
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -225,51 +228,62 @@ private static String getExecutorType(WorkerTopologyContext workerTopologyContex
}
}
+ public Queue<AddressedTuple> getPendingEmits() {
+ return pendingEmits;
+ }
+
/**
* separated from mkExecutor in order to replace executor transfer in executor data for testing
*/
public ExecutorShutdown execute() throws Exception {
LOG.info("Loading executor tasks " + componentId + ":" + executorId);
- registerBackpressure();
- Utils.SmartThread systemThreads =
- Utils.asyncLoop(executorTransfer, executorTransfer.getName(), reportErrorDie);
-
String handlerName = componentId + "-executor" + executorId;
- Utils.SmartThread handlers =
+ Utils.SmartThread handler =
Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName);
setupTicks(StatsUtil.SPOUT.equals(type));
LOG.info("Finished loading executor " + componentId + ":" + executorId);
- return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask);
+ return new ExecutorShutdown(this, Lists.newArrayList(handler), idToTask);
}
public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception;
- @SuppressWarnings("unchecked")
@Override
- public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception {
- ArrayList<AddressedTuple> addressedTuples = (ArrayList<AddressedTuple>) event;
- for (AddressedTuple addressedTuple : addressedTuples) {
- TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
- int taskId = addressedTuple.getDest();
- if (isDebug) {
- LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
- }
+ public void accept(Object event) {
+ if (event == JCQueue.INTERRUPT) {
+ throw new RuntimeException(new InterruptedException("JCQ processing interrupted") );
+ }
+ AddressedTuple addressedTuple = (AddressedTuple)event;
+ int taskId = addressedTuple.getDest();
+
+ TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
+ if (isDebug) {
+ LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
+ }
+
+ try {
if (taskId != AddressedTuple.BROADCAST_DEST) {
tupleActionFn(taskId, tuple);
} else {
for (Integer t : taskIds) {
tupleActionFn(t, tuple);
}
}
+ } catch (Exception e) {
+ throw new RuntimeException(e);
--- End diff --
[Utils.asyncLoop()](https://github.com/roshannaik/storm/blob/STORM-2306-2/storm-client/src/jvm/org/apache/storm/utils/Utils.java#L351) of the BoltExecutor or SpoutExecutor thread
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r168034925
--- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
@@ -22,19 +22,23 @@
import org.apache.storm.executor.TupleInfo;
import org.apache.storm.spout.ISpout;
import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.MessageId;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
import org.apache.storm.utils.MutableLong;
import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally
--- End diff --
I thought STORM-2945 was filed to find the way to support background emit without external synchronization, so likely having the chance to keep unresolved in 2.x. If you intended to document how to enable background emit with current state in STORM-2945, please ignore the comment here.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r161358833
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -225,51 +228,62 @@ private static String getExecutorType(WorkerTopologyContext workerTopologyContex
}
}
+ public Queue<AddressedTuple> getPendingEmits() {
+ return pendingEmits;
+ }
+
/**
* separated from mkExecutor in order to replace executor transfer in executor data for testing
*/
public ExecutorShutdown execute() throws Exception {
LOG.info("Loading executor tasks " + componentId + ":" + executorId);
- registerBackpressure();
- Utils.SmartThread systemThreads =
- Utils.asyncLoop(executorTransfer, executorTransfer.getName(), reportErrorDie);
-
String handlerName = componentId + "-executor" + executorId;
- Utils.SmartThread handlers =
+ Utils.SmartThread handler =
Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName);
setupTicks(StatsUtil.SPOUT.equals(type));
LOG.info("Finished loading executor " + componentId + ":" + executorId);
- return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask);
+ return new ExecutorShutdown(this, Lists.newArrayList(handler), idToTask);
}
public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception;
- @SuppressWarnings("unchecked")
@Override
- public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception {
- ArrayList<AddressedTuple> addressedTuples = (ArrayList<AddressedTuple>) event;
- for (AddressedTuple addressedTuple : addressedTuples) {
- TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
- int taskId = addressedTuple.getDest();
- if (isDebug) {
- LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
- }
+ public void accept(Object event) {
+ if (event == JCQueue.INTERRUPT) {
+ throw new RuntimeException(new InterruptedException("JCQ processing interrupted") );
--- End diff --
That exception bubbles up to and gets caught by the Utils.asyncLoop in the thread that calls the recvQueue.consume() [i.e. a BoltExecutor or SpoutExecutor thread]
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r160065976
--- Diff: storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java ---
@@ -24,50 +24,46 @@
import org.apache.storm.task.GeneralTopologyContext;
public class TupleImpl implements Tuple {
- private final List<Object> values;
- private final int taskId;
- private final String streamId;
- private final GeneralTopologyContext context;
- private final MessageId id;
+ private List<Object> values;
+ private int taskId;
+ private String streamId;
+ private GeneralTopologyContext context;
+ private MessageId id;
+ private final String srcComponent;
private Long _processSampleStartTime;
private Long _executeSampleStartTime;
private long _outAckVal = 0;
-
+
public TupleImpl(Tuple t) {
this.values = t.getValues();
this.taskId = t.getSourceTask();
this.streamId = t.getSourceStreamId();
this.id = t.getMessageId();
this.context = t.getContext();
- if (t instanceof TupleImpl) {
+ this.srcComponent = t.getSourceComponent();
+ try {
TupleImpl ti = (TupleImpl) t;
this._processSampleStartTime = ti._processSampleStartTime;
this._executeSampleStartTime = ti._executeSampleStartTime;
this._outAckVal = ti._outAckVal;
+ } catch (ClassCastException e) {
+ // ignore ... if t is not a TupleImpl type .. faster than checking and then casting
}
}
- public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
+ public TupleImpl(GeneralTopologyContext context, List<Object> values, String srcComponent, int taskId, String streamId, MessageId id) {
this.values = Collections.unmodifiableList(values);
this.taskId = taskId;
this.streamId = streamId;
this.id = id;
this.context = context;
-
- String componentId = context.getComponentId(taskId);
- Fields schema = context.getComponentOutputFields(componentId, streamId);
- if(values.size()!=schema.size()) {
- throw new IllegalArgumentException(
- "Tuple created with wrong number of fields. " +
- "Expected " + schema.size() + " fields but got " +
- values.size() + " fields");
- }
--- End diff --
I think having a map associating pair of taskId, streamId to schema's size may help reducing the slowdown. I know we are going to avoid map lookup, but it looks like acceptable tradeoff to me if we still want to leave the sanity check.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r163967849
--- Diff: storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java ---
@@ -47,7 +47,6 @@
public static final String STORMS_ROOT = "storms";
public static final String SUPERVISORS_ROOT = "supervisors";
public static final String WORKERBEATS_ROOT = "workerbeats";
- public static final String BACKPRESSURE_ROOT = "backpressure";
--- End diff --
@revans2 can u you clarify ? 2.x will require topo jars to be rebuilt for it.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159956952
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.daemon.worker;
+
+import org.apache.storm.messaging.netty.BackPressureStatus;
+import org.apache.storm.utils.JCQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.storm.Constants.SYSTEM_TASK_ID;
+
+public class BackPressureTracker {
+ static final Logger LOG = LoggerFactory.getLogger(BackPressureTracker.class);
+
+ private final Map<Integer, JCQueue> bpTasks = new ConcurrentHashMap<>(); // updates are more frequent than iteration
+ private final Set<Integer> nonBpTasks = ConcurrentHashMap.newKeySet();
+ private final String workerId;
+
+ public BackPressureTracker(String workerId, List<Integer> allLocalTasks) {
+ this.workerId = workerId;
+ this.nonBpTasks.addAll(allLocalTasks); // all tasks are considered to be not under BP initially
+ this.nonBpTasks.remove((int)SYSTEM_TASK_ID); // not tracking system task
+ }
+
+ /* called by transferLocalBatch() on NettyWorker thread
+ * returns true if an update was recorded, false if taskId is already under BP
+ */
+ public boolean recordBackpressure(Integer taskId, JCQueue recvQ) {
+ if (nonBpTasks.remove(taskId)) {
+ bpTasks.put(taskId, recvQ);
+ return true;
+ }
+ return false;
+ }
+
+ // returns true if there was a change in the BP situation
+ public boolean refreshBpTaskList() {
+ boolean changed = false;
+ LOG.debug("Running Back Pressure status change check");
+ for (Iterator<Entry<Integer, JCQueue>> itr = bpTasks.entrySet().iterator(); itr.hasNext(); ) {
+ Entry<Integer, JCQueue> entry = itr.next();
+ if (entry.getValue().isEmptyOverflow()) {
+ // move task from bpTasks to noBpTasks
+ nonBpTasks.add(entry.getKey());
+ itr.remove();
--- End diff --
It looks like there is a race condition here with `recordBackpressure`. If `nonBpTasks.add(entry.getKey())` finishes and then we get a context switch and `recordBackpressure` completes fully for the same taskId then itr.remove happens we might have an inconsistent state.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167097905
--- Diff: storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java ---
@@ -17,72 +17,124 @@
*/
package org.apache.storm.executor;
-import com.google.common.annotations.VisibleForTesting;
-import com.lmax.disruptor.EventHandler;
import org.apache.storm.Config;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.serialization.KryoTupleSerializer;
import org.apache.storm.tuple.AddressedTuple;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.DisruptorQueue;
-import org.apache.storm.utils.MutableObject;
+import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.Queue;
-public class ExecutorTransfer implements EventHandler, Callable {
+// Every executor has an instance of this class
+public class ExecutorTransfer {
private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class);
private final WorkerState workerData;
- private final DisruptorQueue batchTransferQueue;
- private final Map<String, Object> topoConf;
private final KryoTupleSerializer serializer;
- private final MutableObject cachedEmit;
private final boolean isDebug;
+ private final int producerBatchSz;
+ private int remotesBatchSz = 0;
+ private int indexingBase = 0;
+ private ArrayList<JCQueue> localReceiveQueues; // [taskId-indexingBase] => queue : List of all recvQs local to this worker
+ private ArrayList<JCQueue> queuesToFlush; // [taskId-indexingBase] => queue, some entries can be null. : outbound Qs for this executor instance
- public ExecutorTransfer(WorkerState workerData, DisruptorQueue batchTransferQueue, Map<String, Object> topoConf) {
+
+ public ExecutorTransfer(WorkerState workerData, Map<String, Object> topoConf) {
this.workerData = workerData;
- this.batchTransferQueue = batchTransferQueue;
- this.topoConf = topoConf;
this.serializer = new KryoTupleSerializer(topoConf, workerData.getWorkerTopologyContext());
- this.cachedEmit = new MutableObject(new ArrayList<>());
this.isDebug = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);
+ this.producerBatchSz = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
+ }
+
+ // to be called after all Executor objects in the worker are created and before this object is used
+ public void initLocalRecvQueues() {
+ Integer minTaskId = workerData.getLocalReceiveQueues().keySet().stream().min(Integer::compareTo).get();
+ this.localReceiveQueues = Utils.convertToArray( workerData.getLocalReceiveQueues(), minTaskId);
+ this.indexingBase = minTaskId;
+ this.queuesToFlush = new ArrayList<JCQueue>(Collections.nCopies(localReceiveQueues.size(), null) );
}
- public void transfer(int task, Tuple tuple) {
- AddressedTuple val = new AddressedTuple(task, tuple);
+ // adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null)
+ public boolean tryTransfer(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits) {
if (isDebug) {
- LOG.info("TRANSFERRING tuple {}", val);
+ LOG.info("TRANSFERRING tuple {}", addressedTuple);
+ }
+
+ JCQueue localQueue = getLocalQueue(addressedTuple);
+ if (localQueue!=null) {
+ return tryTransferLocal(addressedTuple, localQueue, pendingEmits);
+ } else {
+ if (remotesBatchSz >= producerBatchSz) {
+ if ( !workerData.tryFlushRemotes() ) {
+ if (pendingEmits != null) {
+ pendingEmits.add(addressedTuple);
+ }
+ return false;
+ }
+ remotesBatchSz = 0;
--- End diff --
@roshannaik and me briefly talked about this and agree this should be the thing to fix. Clear way but higher impact is guarding whole else statement with `synchronized` or more efficient locking if any, but @roshannaik would wanted to explore the way to minimize the impact of synchronization.
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:
https://github.com/apache/storm/pull/2502
I'm running performance test (TVL) to see there's any regression here. As @revans2 already did performance test, I wouldn't spend the time going through too deeply (just couple of tests). I would provide +1 after things are going well.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167286681
--- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
@@ -890,30 +871,91 @@
public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
/**
- * Configure timeout milliseconds used for disruptor queue wait strategy. Can be used to tradeoff latency
- * vs. CPU usage
+ * Selects the Bolt's Wait Strategy to use when there are no incoming msgs. Used to trade off latency vs CPU usage.
+ */
+ @isString
+ public static final String TOPOLOGY_BOLT_WAIT_STRATEGY = "topology.bolt.wait.strategy";
+
+ /**
+ * Configures park time for WaitStrategyPark. If set to 0, returns immediately (i.e busy wait).
*/
- @isInteger
@NotNull
- public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS="topology.disruptor.wait.timeout.millis";
+ @isPositiveNumber(includeZero = true)
+ public static final String TOPOLOGY_BOLT_WAIT_PARK_MICROSEC = "topology.bolt.wait.park.microsec";
/**
- * The number of tuples to batch before sending to the next thread. This number is just an initial suggestion and
- * the code may adjust it as your topology runs.
+ * Configures number of iterations to spend in level 1 of WaitStrategyProgressive, before progressing to level 2
*/
+ @NotNull
@isInteger
@isPositiveNumber
+ public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL1_COUNT = "topology.bolt.wait.progressive.level1.count";
+
+ /**
+ * Configures number of iterations to spend in level 2 of WaitStrategyProgressive, before progressing to level 3
+ */
@NotNull
- public static final String TOPOLOGY_DISRUPTOR_BATCH_SIZE="topology.disruptor.batch.size";
+ @isInteger
+ @isPositiveNumber
+ public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL2_COUNT = "topology.bolt.wait.progressive.level2.count";
/**
- * The maximum age in milliseconds a batch can be before being sent to the next thread. This number is just an
- * initial suggestion and the code may adjust it as your topology runs.
+ * Configures sleep time for WaitStrategyProgressive.
*/
+ @NotNull
+ @isPositiveNumber(includeZero = true)
+ public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS = "topology.bolt.wait.progressive.level3.sleep.millis";
+
+
+ /**
+ * A class that implements a wait strategy for an upstream component (spout/bolt) trying to write to a downstream component
+ * whose recv queue is full
+ *
+ * 1. nextTuple emits no tuples
+ * 2. The spout has hit maxSpoutPending and can't emit any more tuples
+ */
+ @isString
+ public static final String TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY="topology.backpressure.wait.strategy";
--- End diff --
Here too if this is a class name it would be good to verify that it is an instance of a given class early on.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167379385
--- Diff: pom.xml ---
@@ -259,6 +259,7 @@
<snakeyaml.version>1.11</snakeyaml.version>
<httpclient.version>4.3.3</httpclient.version>
<clojure.tools.cli.version>0.2.4</clojure.tools.cli.version>
+ <jctools.version>2.0.1</jctools.version>
--- End diff --
I dont see any references to jctools in any flux related pom. Can you point to the specific offending location ?
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on the issue:
https://github.com/apache/storm/pull/2502
@roshannaik Yes sure. https://issues.apache.org/jira/browse/STORM-2983 This blocks my current work and I would really appreciate it if it can be solved soon. Thanks
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r161361250
--- Diff: storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java ---
@@ -76,6 +76,32 @@ public static Integer getInt(Object o, Integer defaultValue) {
throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
}
+ public static Long getLong(Object o) {
+ return getLong(o, null);
+ }
+
+ public static Long getLong(Object o, Long defaultValue) {
+ if (null == o) {
+ return defaultValue;
+ }
+
+ if ( o instanceof Long ||
+ o instanceof Integer ||
+ o instanceof Short ||
+ o instanceof Byte) {
+ return ((Number) o).longValue();
+ } else if (o instanceof Double) {
+ final long l = (Long) o;
--- End diff --
yes my bad. I think its ok to throw exception for numbers larger than Long.
I dont see the bug in getInt
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159969475
--- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -328,20 +328,22 @@ public static boolean isSystemId(String id) {
* @return the newly created thread
* @see Thread
*/
- public static SmartThread asyncLoop(final Callable afn,
- boolean isDaemon, final Thread.UncaughtExceptionHandler eh,
- int priority, final boolean isFactory, boolean startImmediately,
- String threadName) {
+ public static SmartThread asyncLoop(final Callable afn, boolean isDaemon, final Thread.UncaughtExceptionHandler eh,
+ int priority, final boolean isFactory, boolean startImmediately,
+ String threadName) {
SmartThread thread = new SmartThread(new Runnable() {
public void run() {
- Object s;
try {
- Callable fn = isFactory ? (Callable) afn.call() : afn;
- while ((s = fn.call()) instanceof Long) {
- Time.sleepSecs((Long) s);
+ final Callable<Long> fn = isFactory ? (Callable<Long>) afn.call() : afn;
+ while (true) {
+ final Long s = fn.call();
+ if (s==null) // then stop running it
+ break;
+ if (s>0)
+ Thread.sleep(s);
--- End diff --
This needs to be Time.sleep if we want simulated time to work properly....
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159966670
--- Diff: storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java ---
@@ -20,6 +20,7 @@
import org.apache.storm.Config;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.StormTopology;
+import org.apache.storm.messaging.netty.BackPressureStatus;
--- End diff --
Is this change needed? Is this even used here?
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159969170
--- Diff: storm-client/src/jvm/org/apache/storm/utils/JCQueue.java ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.storm.policy.IWaitStrategy;
+import org.apache.storm.metric.api.IStatefulObject;
+import org.apache.storm.metric.internal.RateTracker;
+import org.jctools.queues.MessagePassingQueue;
+import org.jctools.queues.MpscArrayQueue;
+import org.jctools.queues.MpscUnboundedArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public final class JCQueue implements IStatefulObject {
+ private static final Logger LOG = LoggerFactory.getLogger(JCQueue.class);
+
+ public static final Object INTERRUPT = new Object();
+
+ private final ThroughputMeter emptyMeter = new ThroughputMeter("EmptyBatch");
--- End diff --
This is never reported anywhere... Do we want to just delete it and move ThroughputMeter to perf?
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r160010894
--- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
@@ -105,19 +134,16 @@ public void reportError(Throwable error) {
msgId = MessageId.makeUnanchored();
}
- TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, this.taskId, stream, msgId);
- executor.getExecutorTransfer().transfer(t, tuple);
+ final TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), this.taskId, stream, msgId);
+ AddressedTuple adrTuple = new AddressedTuple(t, tuple);
+ executor.getExecutorTransfer().tryTransfer(adrTuple, executor.getPendingEmits());
}
if (isEventLoggers) {
- executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), messageId, random);
+ taskData.sendToEventLogger(executor, values, executor.getComponentId(), messageId, random, executor.getPendingEmits());
}
--- End diff --
@HeartSaVioR you pointed out some optimizations are possible to this .. that we can tackle in another jira ... can you elaborate or capture your thoughts into a jira ?
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167325679
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.daemon.worker;
+
+import org.apache.storm.Config;
+import org.apache.storm.messaging.TaskMessage;
+import org.apache.storm.policy.IWaitStrategy;
+import org.apache.storm.serialization.ITupleSerializer;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.utils.JCQueue;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.TransferDrainer;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.Utils.SmartThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+// Transfers messages destined to other workers
+class WorkerTransfer implements JCQueue.Consumer {
+ static final Logger LOG = LoggerFactory.getLogger(WorkerTransfer.class);
+
+ private final TransferDrainer drainer;
+ private WorkerState workerState;
+ private IWaitStrategy backPressureWaitStrategy;
+
+ JCQueue transferQueue; // [remoteTaskId] -> JCQueue. Some entries maybe null (if no emits to those tasksIds from this worker)
+ AtomicBoolean[] remoteBackPressureStatus; // [[remoteTaskId] -> true/false : indicates if remote task is under BP.
--- End diff --
Same here for package private.
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:
https://github.com/apache/storm/pull/2502
@HeartSaVioR
I am not seeing the reported excessive CPU usage behavior for 4 workers. First I tried taking some runs on my mid 2015 macbook pro (single cpu socket). Since no issue was seen, i reran the same on a linux server (2 sockets, 6 physical cores per socket).
The below cmd was run.
{code}
bin/storm jar storm-loadgen-*.jar org.apache.storm.loadgen.ThroughputVsLatency --rate 85000 --spouts 4 --splitters 4 --counters 4 -c topology.max.spout.pending=5000 -c topology.workers=4
{code}
All runs were similar so just posting results from one run for each.
## Mac Book Runs:
**Master without 2306** (#ab7b4ca) [**Latency from UI =240ms**]
start(s) | end(s) | rate(tuple/s) | mean(ms) | 99%ile(ms) | 99.9%ile(ms) | cores | mem(MB)
-- | -- | -- | -- | -- | -- | -- | --
0 | 30 | 19,319.467 | 8,411.912 | 13,153.337 | 13,254.001 | 5.275 | 340.719
30 | 60 | 72,613.500 | 16,113.830 | 17,750.295 | 17,884.512 | 6.849 | 364.807
60 | 90 | 75,519.067 | 19,671.741 | 21,474.836 | 21,558.723 | 6.755 | 344.906
90 | 120 | 79,225.167 | 22,121.255 | 23,823.647 | 23,941.087 | 6.878 | 222.115
120 | 150 | 80,967.567 | 23,784.484 | 25,585.254 | 25,652.363 | 6.889 | 365.383
150 | 180 | 80,062.067 | 25,269.534 | 27,665.629 | 27,866.956 | 6.895 | 343.674
180 | 210 | 78,306.300 | 27,560.823 | 30,467.424 | 30,551.310 | 7.025 | 436.814
210 | 240 | 78,586.667 | 29,673.204 | 32,883.343 | 33,000.784 | 6.945 | 293.014
240 | 270 | 80,500.667 | 31,513.715 | 34,795.946 | 34,829.500 | 6.818 | 416.121
270 | 300 | 82,625.667 | 32,883.453 | 35,903.242 | 35,970.351 | 6.899 | 364.058
**Master with 2306** (#09e0123) [**Latency from UI =89ms**]
start(s) | end(s) | rate(tuple/s) | mean(ms) | 99%ile(ms) | 99.9%ile(ms) | cores | mem(MB)
-- | -- | -- | -- | -- | -- | -- | --
0 | 30 | 19,607.100 | 8,127.475 | 14,277.411 | 14,772.339 | 4.861 | 330.486
30 | 60 | 85,946.533 | 12,737.708 | 18,471.715 | 18,589.155 | 6.587 | 418.567
60 | 90 | 91,256.133 | 12,276.112 | 17,901.289 | 18,001.953 | 6.527 | 229.531
90 | 120 | 95,317.967 | 9,204.098 | 14,529.069 | 14,612.955 | 6.552 | 432.803
120 | 150 | 97,220.233 | 5,221.476 | 9,865.003 | 10,125.050 | 6.551 | 169.757
150 | 180 | 92,499.200 | 984.904 | 4,213.178 | 4,546.626 | 6.746 | 280.883
180 | 210 | 79,557.700 | 1,059.939 | 2,619.343 | 2,766.143 | 6.155 | 430.853
210 | 240 | 79,766.467 | 2,336.027 | 5,158.994 | 5,347.738 | 6.238 | 288.708
240 | 270 | 81,595.800 | 4,284.723 | 7,377.781 | 7,528.776 | 6.258 | 315.524
270 | 300 | 88,294.067 | 5,024.767 | 8,422.162 | 8,493.466 | 6.412 | 263.682
## Linux Server Runs:
**Master without 2306** (#ab7b4ca) [**Latency from UI =15ms**]
start_time(s) | end_time(s) | rate(tuple/s) | mean(ms) | 99%ile(ms) | 99.9%ile(ms) | cores | mem(MB)
-- | -- | -- | -- | -- | -- | -- | --
0 | 30 | 56,704.80 | 1,194.20 | 3,867.15 | 3,978.30 | 11.294 | 978.701
30 | 60 | 85,001.70 | 12.018 | 21.479 | 26.837 | 11.931 | 897.105
60 | 90 | 85,007.17 | 11.84 | 21.152 | 26.345 | 11.575 | 950.129
90 | 120 | 85,008.13 | 11.781 | 20.447 | 24.986 | 11.688 | 882.768
120 | 150 | 85,009.57 | 11.841 | 21.021 | 25.592 | 11.735 | 921.339
150 | 180 | 84,997.60 | 11.763 | 20.791 | 25.199 | 11.476 | 961.395
180 | 210 | 85,005.57 | 11.824 | 20.66 | 25.281 | 11.798 | 1,000.03
210 | 240 | 85,008.00 | 11.75 | 20.611 | 25.706 | 11.354 | 1,120.33
240 | 271 | 82,259.16 | 11.916 | 20.955 | 25.117 | 11.517 | 939.918
271 | 301 | 85,003.57 | 11.719 | 20.398 | 24.822 | 11.322 | 980.654
**Master with 2306** (#09e0123) [**Latency from UI =7.8ms**]
start_time(s) | end_time(s) | rate(tuple/s) | mean(ms) | 99%ile(ms) | 99.9%ile(ms) | cores | mem(MB)
-- | -- | -- | -- | -- | -- | -- | --
0 | 30 | 56,701.03 | 426.534 | 2,673.87 | 2,793.41 | 7.693 | 290.006
30 | 60 | 85,004.40 | 3.387 | 8.294 | 14.655 | 5.89 | 231.328
60 | 90 | 85,002.20 | 3.332 | 7.66 | 10.945 | 5.791 | 247.683
90 | 121 | 82,259.87 | 3.33 | 7.516 | 9.839 | 5.591 | 264.324
121 | 151 | 85,004.37 | 3.349 | 7.737 | 10.969 | 5.819 | 272.916
151 | 181 | 85,001.07 | 3.323 | 7.434 | 9.961 | 5.747 | 203.54
181 | 211 | 85,002.57 | 3.335 | 7.586 | 10.281 | 5.794 | 304.567
211 | 241 | 85,005.03 | 3.326 | 7.475 | 9.921 | 5.836 | 317.026
241 | 271 | 85,003.27 | 3.339 | 7.565 | 10.527 | 5.723 | 251.676
271 | 301 | 85,002.00 | 3.351 | 7.799 | 12.394 | 5.716 | 377.182
## Summary:
**On Linux** Both CPU and mem usage was significantly better for 2306.
Actual latency (taken from UI) was also much better for 2306.
**On Macbook:** Both CPU & mem usage were relatively close but slightly favoring 2306. Again, the actual latency was much better for 2306.
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:
https://github.com/apache/storm/pull/2502
Nice feeling for me to see this merged in and I will look into the CPU usage issue reported by @HeartSaVioR. Just Wanted to specifically call out and thank the following folks who have been very helpful in making this possible:
Sapin Amin, @harshach, @revans2 , @arunmahadevan, @HeartSaVioR and @satishd !
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167402753
--- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus implements java.io.Serializable {
--- End diff --
ControlMessage, MessageBatch and SaslMessageToken are handled explicitly by MessageDecoder and MessageEncoder using the buffer and read methods.
When ControlMessage, SaslMessageToken, or Message Batch writes themselves out to a buffer they do not use kryo to do it.
https://github.com/apache/storm/blob/aaebc3b237916340156ac3b8dc956d6c62c34983/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java#L60-L64
https://github.com/apache/storm/blob/aaebc3b237916340156ac3b8dc956d6c62c34983/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java#L86-L101
https://github.com/apache/storm/blob/aaebc3b237916340156ac3b8dc956d6c62c34983/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java#L80-L93
It is a hand coded protocol (for good or bad).
The messages inside MessageBatch have already been serialized using kryo elsewhere in the pipeline.
For the load messages we hid them as a tuple inside a MessageBatch. We did this so we could do a rolling upgrade with it, but it is an ugly hack.
BackPressureStatus is serialized/deserialized using MessageEncoder/MessageDecoder, but it uses kryo internally to do it.
So please remove Serializable from BackPressureStatus. Register it with kryo. Do not remove `BackPressureStatus.buffer()` nor `BackPressureStatus.read()`. I think the changes to KryoTupleSerializer to send a BackPressureState can be removed assuming no one is calling them directly.
The simplest way to test this is to run a topology with multiple workers and `topology.fall.back.on.java.serialization=false`. This is a config that makes it so kryo does not fall back to java serialization when trying to use kryo.
For normal tuples/end users you can set `topology.testing.always.try.serialize=true` and every tuple emitted will be serialized, even in local mode. This is a way to unit test that you have setup kryo appropriately, but this BackPressureStatus is a special message so we need to do it differently.
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:
https://github.com/apache/storm/pull/2502
Not very familiar with RAS and how it works ..... it seems to be default enabled ... Can you elaborate what setup was needed for RAS (if any) in your case ?
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r168044307
--- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
@@ -22,19 +22,23 @@
import org.apache.storm.executor.TupleInfo;
import org.apache.storm.spout.ISpout;
import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.MessageId;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
import org.apache.storm.utils.MutableLong;
import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally
--- End diff --
OK. Please let me know if you plan to figure out in time frame of Storm 2.0.0. I'll add it in epic of releasing Storm 2.0.0. Thanks!
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on the issue:
https://github.com/apache/storm/pull/2502
Yes sure. My storm.yaml is as follows:
```
storm.zookeeper.servers:
- "persistmist.corp.ne1.yahoo.com"
nimbus.seeds: ["persistmist.corp.ne1.yahoo.com", "localhost"]
storm.local.dir: "/tmp/apache-storm-2.0.0-SNAPSHOT/storm-local"
supervisor.run.worker.as.user: true
supervisor.worker.launcher: /etc/storm/worker-launcher
nimbus.authorizer: "org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer"
nimbus.supervisor.users:
- "mapredqa"
nimbus.admins:
- "mapredqa"
- "ethan"
nimbus.users:
- "mapredqa"
- "ethan"
ui.users:
- "mapredqa"
- "ethan"
logs.users:
- "mapredqa"
- "ethan"
storm.thrift.transport: "org.apache.storm.security.auth.kerberos.KerberosSaslTransportPlugin"
java.security.auth.login.config: "/jaas/storm_jaas.conf"
nimbus.childopts: "-Xmx1024m -Djava.security.auth.login.config=/jaas/storm_jaas.conf " # " -Dsun.security.krb5.debug=true"
ui.childopts: "-Xmx768m -Djava.security.auth.login.config=/jaas/storm_jaas.conf" # -Dsun.security.krb5.debug=true"
supervisor.childopts: "-Xmx256m -Djava.security.auth.login.config=/jaas/storm_jaas.conf"
storm.principal.tolocal: "org.apache.storm.security.auth.KerberosPrincipalToLocal"
storm.zookeeper.superACL: "sasl:mapredqa"
storm.blobstore.acl.validation.enabled: false
ui.header.buffer.bytes: 65536
ui.filter: "org.apache.hadoop.security.authentication.server.AuthenticationFilter"
ui.filter.params:
"type": "kerberos"
"kerberos.principal": "HTTP/persistmist.corp.ne1.yahoo.com"
"kerberos.keytab": "/keytabs/HTTP.keytab"
"kerberos.name.rules": "DEFAULT"
scheduler.display.resource: true
storm.scheduler: "org.apache.storm.scheduler.resource.ResourceAwareScheduler"
```
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167387844
--- Diff: pom.xml ---
@@ -259,6 +259,7 @@
<snakeyaml.version>1.11</snakeyaml.version>
<httpclient.version>4.3.3</httpclient.version>
<clojure.tools.cli.version>0.2.4</clojure.tools.cli.version>
+ <jctools.version>2.0.1</jctools.version>
--- End diff --
I guess I misread things, please ignore this comment.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167385590
--- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
@@ -44,14 +48,15 @@
private final Boolean isEventLoggers;
private final Boolean isDebug;
private final RotatingMap<Long, TupleInfo> pending;
+ private TupleInfo globalTupleInfo = new TupleInfo(); // thread safety: assumes Collector.emit*() calls are externally synchronized (if needed).
--- End diff --
Accessing the Thread Local (TL) instance via ThreadLocal.get() typically involves a map lookup behind the scenes.
**Related Note:** I reluctantly used TL for latching on to JCQueue.BatchInserter instance for the producers to JCQueue. Reluctant since I noticed perf hit when doing some targeted microbenchmarking. I used it anyway because it was a perf improvement over the ConcurrentHashMap employed in Disruptor, and eliminating TL needed a bigger change to the interface and producers. I think it is possible to achieve TL free JCQueue and gain some perf.. perhaps in a follow up jira.
Although many decisions were measured, due to scope it was not feasible to measure each one. So, in the critical path, I have taken this general approach of :
- avoid locks & synchronization ... and try using lock/wait-free approaches where synchronization is unavoidable.
- avoid map lookups and object creation
This was a case of avoiding synchronization, (TL) map lookups & object allocation.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r161358582
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -196,19 +197,21 @@ public static Executor mkExecutor(WorkerState workerState, List<Long> executorId
executor.stats = new BoltExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()),ObjectReader.getInt(executor.getStormConf().get(Config.NUM_STAT_BUCKETS)));
}
+ int minId = Integer.MAX_VALUE;
Map<Integer, Task> idToTask = new HashMap<>();
for (Integer taskId : taskIds) {
+ minId = Math.min(minId, taskId);
try {
Task task = new Task(executor, taskId);
- executor.sendUnanchored(
- task, StormCommon.SYSTEM_STREAM_ID, new Values("startup"), executor.getExecutorTransfer());
+ task.sendUnanchored( StormCommon.SYSTEM_STREAM_ID, new Values("startup"), executor.getExecutorTransfer(), null); // TODO: Roshan: does this get delivered/handled anywhere ?
--- End diff --
I don't care too much, will retain if it has some use.
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:
https://github.com/apache/storm/pull/2502
- Thanks for giving it a spin and posting the numbers. The higher CPU in multi-worker mode is not something I have seen before. Will take a look. Can you please share the cmd line that you used ? Were all 4 workers on same host ?
- Plan is to rerun the suite that was previously published and see if any regressions were introduced due to recent changes and the rebasing .. which i think introduced about 300 or so commits since the last perf run. Will do this once we have
-**Side Note:** If you run TVL again, do pay attention to the latency on the UI. TVLs report does not seem to be including the actual latency.
Will quash the commits and refresh this PR so that it can be committed.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167331543
--- Diff: storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchExecutor.java ---
@@ -194,7 +194,12 @@ public void reportError(Throwable t) {
public void emitDirect(int task, String stream, List<Object> values, Object id) {
throw new UnsupportedOperationException("Trident does not support direct streams");
}
-
+
+ @Override
+ public void flush() {
+ //NOOP //TODO: Roshan: validate if this is OK
--- End diff --
TODO needs to go away. Is this OK to not have a flush?
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:
https://github.com/apache/storm/pull/2502
Yes I only did the performance test for single node.
Script for running TVL:
* https://gist.github.com/HeartSaVioR/139a1a8ea6a6e285578598095d1c816f (for 4 workers)
* https://gist.github.com/HeartSaVioR/27300c7e476efb4b58e86656e2073505 (for 1 worker)
I'm also planning to share some scripts regarding verifying release, running daemons in a node via tmux/tmuxinator.
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:
https://github.com/apache/storm/pull/2502
I think I have addressed all the major and minor issues as well.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159960326
--- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -225,51 +228,62 @@ private static String getExecutorType(WorkerTopologyContext workerTopologyContex
}
}
+ public Queue<AddressedTuple> getPendingEmits() {
+ return pendingEmits;
+ }
+
/**
* separated from mkExecutor in order to replace executor transfer in executor data for testing
*/
public ExecutorShutdown execute() throws Exception {
LOG.info("Loading executor tasks " + componentId + ":" + executorId);
- registerBackpressure();
- Utils.SmartThread systemThreads =
- Utils.asyncLoop(executorTransfer, executorTransfer.getName(), reportErrorDie);
-
String handlerName = componentId + "-executor" + executorId;
- Utils.SmartThread handlers =
+ Utils.SmartThread handler =
Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName);
setupTicks(StatsUtil.SPOUT.equals(type));
LOG.info("Finished loading executor " + componentId + ":" + executorId);
- return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask);
+ return new ExecutorShutdown(this, Lists.newArrayList(handler), idToTask);
}
public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception;
- @SuppressWarnings("unchecked")
@Override
- public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception {
- ArrayList<AddressedTuple> addressedTuples = (ArrayList<AddressedTuple>) event;
- for (AddressedTuple addressedTuple : addressedTuples) {
- TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
- int taskId = addressedTuple.getDest();
- if (isDebug) {
- LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
- }
+ public void accept(Object event) {
+ if (event == JCQueue.INTERRUPT) {
+ throw new RuntimeException(new InterruptedException("JCQ processing interrupted") );
--- End diff --
Could you explain a bit more when JCQueue.INTERRUPT happens and where this exception is caught/handled? Almost every other place that we use an InterruptedException it is treated as the worker is going down in an orderly manor so end the thread without complaining. I just want to be sure that we don't accidentally shut down part of the worker in some odd cases.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159962361
--- Diff: storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java ---
@@ -17,72 +17,124 @@
*/
package org.apache.storm.executor;
-import com.google.common.annotations.VisibleForTesting;
-import com.lmax.disruptor.EventHandler;
import org.apache.storm.Config;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.serialization.KryoTupleSerializer;
import org.apache.storm.tuple.AddressedTuple;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.DisruptorQueue;
-import org.apache.storm.utils.MutableObject;
+import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.Queue;
-public class ExecutorTransfer implements EventHandler, Callable {
+// Every executor has an instance of this class
+public class ExecutorTransfer {
private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class);
private final WorkerState workerData;
- private final DisruptorQueue batchTransferQueue;
- private final Map<String, Object> topoConf;
private final KryoTupleSerializer serializer;
- private final MutableObject cachedEmit;
private final boolean isDebug;
+ private final int producerBatchSz;
+ private int remotesBatchSz = 0;
+ private int indexingBase = 0;
+ private ArrayList<JCQueue> localReceiveQueues; // [taskId-indexingBase] => queue : List of all recvQs local to this worker
+ private ArrayList<JCQueue> queuesToFlush; // [taskId-indexingBase] => queue, some entries can be null. : outbound Qs for this executor instance
- public ExecutorTransfer(WorkerState workerData, DisruptorQueue batchTransferQueue, Map<String, Object> topoConf) {
+
+ public ExecutorTransfer(WorkerState workerData, Map<String, Object> topoConf) {
this.workerData = workerData;
- this.batchTransferQueue = batchTransferQueue;
- this.topoConf = topoConf;
this.serializer = new KryoTupleSerializer(topoConf, workerData.getWorkerTopologyContext());
- this.cachedEmit = new MutableObject(new ArrayList<>());
this.isDebug = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);
+ this.producerBatchSz = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
+ }
+
+ // to be called after all Executor objects in the worker are created and before this object is used
+ public void initLocalRecvQueues() {
+ Integer minTaskId = workerData.getLocalReceiveQueues().keySet().stream().min(Integer::compareTo).get();
+ this.localReceiveQueues = Utils.convertToArray( workerData.getLocalReceiveQueues(), minTaskId);
+ this.indexingBase = minTaskId;
+ this.queuesToFlush = new ArrayList<JCQueue>(Collections.nCopies(localReceiveQueues.size(), null) );
}
- public void transfer(int task, Tuple tuple) {
- AddressedTuple val = new AddressedTuple(task, tuple);
+ // adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null)
+ public boolean tryTransfer(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits) {
if (isDebug) {
- LOG.info("TRANSFERRING tuple {}", val);
+ LOG.info("TRANSFERRING tuple {}", addressedTuple);
+ }
+
+ JCQueue localQueue = getLocalQueue(addressedTuple);
+ if (localQueue!=null) {
+ return tryTransferLocal(addressedTuple, localQueue, pendingEmits);
+ } else {
+ if (remotesBatchSz >= producerBatchSz) {
+ if ( !workerData.tryFlushRemotes() ) {
+ if (pendingEmits != null) {
+ pendingEmits.add(addressedTuple);
+ }
+ return false;
+ }
+ remotesBatchSz = 0;
--- End diff --
Do we have a race condition here? I believe that this method can be called from multiple different threads, and if so then we now have to worry about remotesBatchSz staying consistent.
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:
https://github.com/apache/storm/pull/2502
Congrats @roshannaik great effort and perseverance to get this in and thanks to @revans2 for reviewing in great detail.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r160066094
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java ---
@@ -177,6 +195,35 @@ public BuiltinMetrics getBuiltInMetrics() {
return builtInMetrics;
}
+
+ // Non Blocking call. If cannot emit to destination immediately, such tuples will be added to `pendingEmits` argument
+ public void sendUnanchored(String stream, List<Object> values, ExecutorTransfer transfer, Queue<AddressedTuple> pendingEmits) {
+ Tuple tuple = getTuple(stream, values);
+ List<Integer> tasks = getOutgoingTasks(stream, values);
+ for (Integer t : tasks) {
+ AddressedTuple addressedTuple = new AddressedTuple(t, tuple);
+ transfer.tryTransfer(addressedTuple, pendingEmits);
+ }
+ }
+
+ /**
+ * Send sampled data to the eventlogger if the global or component level debug flag is set (via nimbus api).
+ */
+ public void sendToEventLogger(Executor executor, List values,
--- End diff --
I reread the code and found sampling percentage can be changed. I was thinking about reducing random.nextDouble(), but in this case we may not be able to do that. Please ignore my previous comment.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167288058
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/Acker.java ---
@@ -99,17 +100,19 @@ public void execute(Tuple input) {
pending.put(id, curr);
} else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
resetTimeout = true;
- if (curr != null) {
+ if (curr == null) {
--- End diff --
Why is this change being made?
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167288382
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java ---
@@ -362,6 +362,8 @@ public static void addSystemStreams(StormTopology topology) {
public static void addEventLogger(Map<String, Object> conf, StormTopology topology) {
Integer numExecutors = ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS),
ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
+ if(numExecutors==null || numExecutors==0)
--- End diff --
nit can we fix the style here with a space after the if and '{' '}'
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167385890
--- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus implements java.io.Serializable {
--- End diff --
I was not aware of that. Do we have an example of what i should do instead ?
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:
https://github.com/apache/storm/pull/2502
@Ethanlm should be able to take a look in a couple days. can you share any settings you are using outside in config file other than the one you noted on the cmd line (toplogy.debug=true) ? esp RAS related.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167293987
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---
@@ -456,137 +450,135 @@ public void refreshStormActive(Runnable callback) {
}
}
- public void refreshThrottle() {
- boolean backpressure = stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, this::refreshThrottle);
- this.throttleOn.set(backpressure);
- }
-
- private static double getQueueLoad(DisruptorQueue q) {
- DisruptorQueue.QueueMetrics qMetrics = q.getMetrics();
+ private static double getQueueLoad(JCQueue q) {
+ JCQueue.QueueMetrics qMetrics = q.getMetrics();
return ((double) qMetrics.population()) / qMetrics.capacity();
}
public void refreshLoad(List<IRunningExecutor> execs) {
- Set<Integer> remoteTasks = Sets.difference(new HashSet<>(outboundTasks), new HashSet<>(taskIds));
+ Set<Integer> remoteTasks = Sets.difference(new HashSet<>(outboundTasks), new HashSet<>(localTaskIds));
Long now = System.currentTimeMillis();
Map<Integer, Double> localLoad = new HashMap<>();
- for (IRunningExecutor exec: execs) {
+ for (IRunningExecutor exec : execs) {
double receiveLoad = getQueueLoad(exec.getReceiveQueue());
- double sendLoad = getQueueLoad(exec.getSendQueue());
- localLoad.put(exec.getExecutorId().get(0).intValue(), Math.max(receiveLoad, sendLoad));
+ localLoad.put(exec.getExecutorId().get(0).intValue(), receiveLoad);
}
Map<Integer, Load> remoteLoad = new HashMap<>();
cachedNodeToPortSocket.get().values().stream().forEach(conn -> remoteLoad.putAll(conn.getLoad(remoteTasks)));
loadMapping.setLocal(localLoad);
loadMapping.setRemote(remoteLoad);
- if (now > nextUpdate.get()) {
+ if (now > nextLoadUpdate.get()) {
receiver.sendLoadMetrics(localLoad);
- nextUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
+ nextLoadUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
}
}
+ // checks if the tasks which had back pressure are now free again. if so, sends an update to other workers
+ public void refreshBackPressureStatus() {
+ LOG.debug("Checking for change in Backpressure status on worker's tasks");
+ boolean bpSituationChanged = bpTracker.refreshBpTaskList();
+ if (bpSituationChanged) {
+ BackPressureStatus bpStatus = bpTracker.getCurrStatus();
+ receiver.sendBackPressureStatus(bpStatus);
+ }
+ }
+
+
/**
* we will wait all connections to be ready and then activate the spout/bolt
* when the worker bootup.
*/
public void activateWorkerWhenAllConnectionsReady() {
int delaySecs = 0;
int recurSecs = 1;
- refreshActiveTimer.schedule(delaySecs, new Runnable() {
- @Override public void run() {
+ refreshActiveTimer.schedule(delaySecs,
+ () -> {
if (areAllConnectionsReady()) {
LOG.info("All connections are ready for worker {}:{} with id {}", assignmentId, port, workerId);
isWorkerActive.set(Boolean.TRUE);
} else {
refreshActiveTimer.schedule(recurSecs, () -> activateWorkerWhenAllConnectionsReady(), false, 0);
}
}
- });
+ );
}
public void registerCallbacks() {
LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId, port);
receiver.registerRecv(new DeserializingConnectionCallback(topologyConf,
getWorkerTopologyContext(),
- this::transferLocal));
+ this::transferLocalBatch));
+ // Send curr BackPressure status to new clients
+ receiver.registerNewConnectionResponse(
+ () -> {
+ BackPressureStatus bpStatus = bpTracker.getCurrStatus();
+ LOG.info("Sending BackPressure status to new client. BPStatus: {}", bpStatus);
+ return bpStatus;
+ }
+ );
}
- public void transferLocal(List<AddressedTuple> tupleBatch) {
- Map<Integer, List<AddressedTuple>> grouped = new HashMap<>();
- for (AddressedTuple tuple : tupleBatch) {
- Integer executor = taskToShortExecutor.get(tuple.dest);
- if (null == executor) {
- LOG.warn("Received invalid messages for unknown tasks. Dropping... ");
- continue;
- }
- List<AddressedTuple> current = grouped.get(executor);
- if (null == current) {
- current = new ArrayList<>();
- grouped.put(executor, current);
- }
- current.add(tuple);
- }
+ /* Not a Blocking call. If cannot emit, will add 'tuple' to pendingEmits and return 'false'. 'pendingEmits' can be null */
+ public boolean tryTransferRemote(AddressedTuple tuple, Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) {
+ return workerTransfer.tryTransferRemote(tuple, pendingEmits, serializer);
+ }
- for (Map.Entry<Integer, List<AddressedTuple>> entry : grouped.entrySet()) {
- DisruptorQueue queue = shortExecutorReceiveQueueMap.get(entry.getKey());
- if (null != queue) {
- queue.publish(entry.getValue());
- } else {
- LOG.warn("Received invalid messages for unknown tasks. Dropping... ");
- }
- }
+ public void flushRemotes() throws InterruptedException {
+ workerTransfer.flushRemotes();
}
- public void transfer(KryoTupleSerializer serializer, List<AddressedTuple> tupleBatch) {
- if (trySerializeLocal) {
- assertCanSerialize(serializer, tupleBatch);
- }
- List<AddressedTuple> local = new ArrayList<>();
- Map<Integer, List<TaskMessage>> remoteMap = new HashMap<>();
- for (AddressedTuple addressedTuple : tupleBatch) {
- int destTask = addressedTuple.getDest();
- if (taskIds.contains(destTask)) {
- // Local task
- local.add(addressedTuple);
- } else {
- // Using java objects directly to avoid performance issues in java code
- if (! remoteMap.containsKey(destTask)) {
- remoteMap.put(destTask, new ArrayList<>());
+ public boolean tryFlushRemotes() {
+ return workerTransfer.tryFlushRemotes();
+ }
+
+ // Receives msgs from remote workers and feeds them to local executors. If any receiving local executor is under Back Pressure,
+ // informs other workers about back pressure situation. Runs in the NettyWorker thread.
+ private void transferLocalBatch(ArrayList<AddressedTuple> tupleBatch) {
+ int lastOverflowCount = 0; // overflowQ size at the time the last BPStatus was sent
+
+ for (int i = 0; i < tupleBatch.size(); i++) {
+ AddressedTuple tuple = tupleBatch.get(i);
+ JCQueue queue = shortExecutorReceiveQueueMap.get(tuple.dest);
+
+ // 1- try adding to main queue if its overflow is not empty
+ if (queue.isEmptyOverflow()) {
+ if (queue.tryPublish(tuple)) {
+ continue;
}
- remoteMap.get(destTask).add(new TaskMessage(destTask, serializer.serialize(addressedTuple.getTuple())));
}
- }
- if (!local.isEmpty()) {
- transferLocal(local);
- }
- if (!remoteMap.isEmpty()) {
- transferQueue.publish(remoteMap);
- }
- }
+ // 2- BP detected (i.e MainQ is full). So try adding to overflow
+ int currOverflowCount = queue.getOverflowCount();
+ if (bpTracker.recordBackPressure(tuple.dest, queue)) {
+ receiver.sendBackPressureStatus(bpTracker.getCurrStatus());
+ lastOverflowCount = currOverflowCount;
+ } else {
- // TODO: consider having a max batch size besides what disruptor does automagically to prevent latency issues
- public void sendTuplesToRemoteWorker(HashMap<Integer, ArrayList<TaskMessage>> packets, long seqId, boolean batchEnd) {
- drainer.add(packets);
- if (batchEnd) {
- ReentrantReadWriteLock.ReadLock readLock = endpointSocketLock.readLock();
- try {
- readLock.lock();
- drainer.send(cachedTaskToNodePort.get(), cachedNodeToPortSocket.get());
- } finally {
- readLock.unlock();
+ if (currOverflowCount - lastOverflowCount > 10000) {
+ // resend BP status, in case prev notification was missed or reordered
+ BackPressureStatus bpStatus = bpTracker.getCurrStatus();
+ receiver.sendBackPressureStatus(bpStatus);
+ lastOverflowCount = currOverflowCount;
+ LOG.debug("Re-sent BackPressure Status. OverflowCount = {}, BP Status ID = {}. ", currOverflowCount, bpStatus.id);
+ }
+ }
+ if (!queue.tryPublishToOverflow(tuple)) {
+ dropMessage(tuple, queue);
}
- drainer.clear();
}
}
+ private void dropMessage(AddressedTuple tuple, JCQueue queue) {
+ ++dropCount;
+ queue.recordMsgDrop();
+ LOG.warn("Dropping message as overflow threshold has reached for Q = {}. OverflowCount = {}. Total Drop Count= {}, Dropped Message : {}", queue.getName(), queue.getOverflowCount(), dropCount, tuple.toString());
--- End diff --
nit: `tuple.toString()` is not needed, as the logging will do it for you, but since this is a warn log that should almost never be called it really does not matter.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167383283
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/Acker.java ---
@@ -99,17 +100,19 @@ public void execute(Tuple input) {
pending.put(id, curr);
} else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
resetTimeout = true;
- if (curr != null) {
+ if (curr == null) {
--- End diff --
Unintentional. This appears to be due a mistake most likely when resolving merge conflicts when I rebased my code to master sometime in Dec 2017. Will revert. Thanks for catching.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r164955129
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.daemon.worker;
+
+import org.apache.storm.messaging.netty.BackPressureStatus;
+import org.apache.storm.utils.JCQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.storm.Constants.SYSTEM_TASK_ID;
+
+public class BackPressureTracker {
+ static final Logger LOG = LoggerFactory.getLogger(BackPressureTracker.class);
+
+ private final Map<Integer, JCQueue> bpTasks = new ConcurrentHashMap<>(); // updates are more frequent than iteration
+ private final Set<Integer> nonBpTasks = ConcurrentHashMap.newKeySet();
+ private final String workerId;
+
+ public BackPressureTracker(String workerId, List<Integer> allLocalTasks) {
+ this.workerId = workerId;
+ this.nonBpTasks.addAll(allLocalTasks); // all tasks are considered to be not under BP initially
+ this.nonBpTasks.remove((int)SYSTEM_TASK_ID); // not tracking system task
+ }
+
+ /* called by transferLocalBatch() on NettyWorker thread
+ * returns true if an update was recorded, false if taskId is already under BP
+ */
+ public boolean recordBackpressure(Integer taskId, JCQueue recvQ) {
+ if (nonBpTasks.remove(taskId)) {
+ bpTasks.put(taskId, recvQ);
+ return true;
+ }
+ return false;
+ }
+
+ // returns true if there was a change in the BP situation
+ public boolean refreshBpTaskList() {
+ boolean changed = false;
+ LOG.debug("Running Back Pressure status change check");
+ for (Iterator<Entry<Integer, JCQueue>> itr = bpTasks.entrySet().iterator(); itr.hasNext(); ) {
+ Entry<Integer, JCQueue> entry = itr.next();
+ if (entry.getValue().isEmptyOverflow()) {
+ // move task from bpTasks to noBpTasks
+ nonBpTasks.add(entry.getKey());
+ itr.remove();
--- End diff --
Next update will simplify the logic by using a single map and fix this issue as well.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159968772
--- Diff: storm-client/src/jvm/org/apache/storm/utils/RunningAvg.java ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.utils;
+
+public class RunningAvg {
+
+ private long n = 0;
+ private double oldM, newM, oldS, newS;
+ private String name;
+ public static int printFreq = 20_000_000;
+ private boolean disable;
+ private long count = 0;
+
+ public RunningAvg(String name, boolean disable) {
+ this(name, printFreq, disable);
+ }
+
+ public RunningAvg(String name, int printFreq) {
+ this(name, printFreq, false);
+ }
+
+ public RunningAvg(String name, int printFreq, boolean disable) {
+ this.name = name + "_" + Thread.currentThread().getName();
+ this.printFreq = printFreq;
+ this.disable = disable;
+ }
+
+ public void clear() {
+ n = 0;
+ }
+
+ public void pushLatency(long startMs) {
+ push(System.currentTimeMillis() - startMs);
+ }
+
+ public void push(long x) {
+ if (disable) {
+ return;
+ }
+
+ n++;
+
+ if (n == 1) {
+ oldM = newM = x;
+ oldS = 0;
+ } else {
+ newM = oldM + (x - oldM) / n;
+ newS = oldS + (x - oldM) * (x - newM);
+
+ // set up for next iteration
+ oldM = newM;
+ oldS = newS;
+ }
+ if (++count == printFreq) {
+ System.err.printf(" ***> %s - %,.2f\n", name, mean());
--- End diff --
Can we log this instead of printing it? Also could you file a follow on JIRA so that when we go to a better metrics implementation that we use that instead of printing/logging?
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159969789
--- Diff: storm-server/src/main/java/org/apache/storm/Testing.java ---
@@ -712,6 +712,6 @@ public static Tuple testTuple(List<Object> values, MkTupleParam param) {
new HashMap<>(),
new HashMap<>(),
new AtomicBoolean(false));
- return new TupleImpl(context, values, 1, stream);
+ return new TupleImpl(context, values, "component", 1, stream);
--- End diff --
This I think is the cause of some of the test failures....
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:
https://github.com/apache/storm/pull/2502
Looks like storm-core error is related to worker crash while shutting down.
https://travis-ci.org/apache/storm/jobs/336372156
```
36913 [SLOT_1027] INFO o.a.s.e.ExecutorShutdown - Shut down executor 2:[3, 3]
36913 [SLOT_1027] INFO o.a.s.e.ExecutorShutdown - Shutting down executor 19d8dc30-7afc-44b1-a466-26064b50a580:[2, 2]
36913 [Thread-334-19d8dc30-7afc-44b1-a466-26064b50a580-executor[2, 2]] INFO o.a.s.u.Utils - Async loop interrupted!
36914 [SLOT_1027] INFO o.a.s.e.ExecutorShutdown - Shut down executor 19d8dc30-7afc-44b1-a466-26064b50a580:[2, 2]
36914 [SLOT_1027] INFO o.a.s.e.ExecutorShutdown - Shutting down executor 1:[1, 1]
36914 [Thread-335-1-executor[1, 1]] INFO o.a.s.u.Utils - Async loop interrupted!
36914 [SLOT_1027] INFO o.a.s.e.ExecutorShutdown - Shut down executor 1:[1, 1]
36914 [SLOT_1027] INFO o.a.s.d.w.Worker - Shut down executors
36914 [SLOT_1027] INFO o.a.s.d.w.Worker - Shutting down transfer thread
36914 [Worker-Transfer] ERROR o.a.s.u.Utils - Async loop died!
java.lang.ClassCastException: java.lang.Object cannot be cast to org.apache.storm.messaging.TaskMessage
at org.apache.storm.daemon.worker.WorkerTransfer.accept(WorkerTransfer.java:84) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:309) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.utils.JCQueue.consume(JCQueue.java:290) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.utils.JCQueue.consume(JCQueue.java:281) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.daemon.worker.WorkerTransfer.lambda$makeTransferThread$0(WorkerTransfer.java:75) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.utils.Utils$2.run(Utils.java:350) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
36914 [Worker-Transfer] ERROR o.a.s.u.Utils - Async loop died!
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Object cannot be cast to org.apache.storm.messaging.TaskMessage
at org.apache.storm.utils.Utils$2.run(Utils.java:363) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
Caused by: java.lang.ClassCastException: java.lang.Object cannot be cast to org.apache.storm.messaging.TaskMessage
at org.apache.storm.daemon.worker.WorkerTransfer.accept(WorkerTransfer.java:84) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:309) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.utils.JCQueue.consume(JCQueue.java:290) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.utils.JCQueue.consume(JCQueue.java:281) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.daemon.worker.WorkerTransfer.lambda$makeTransferThread$0(WorkerTransfer.java:75) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.utils.Utils$2.run(Utils.java:350) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
... 1 more
36915 [Worker-Transfer] ERROR o.a.s.u.Utils - Halting process: Async loop died!
java.lang.RuntimeException: Halting process: Async loop died!
at org.apache.storm.utils.Utils.exitProcess(Utils.java:465) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at org.apache.storm.utils.Utils$3.uncaughtException(Utils.java:373) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at java.lang.Thread.dispatchUncaughtException(Thread.java:1959) [?:1.8.0_151]
```
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159957697
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
@@ -155,134 +150,159 @@ public void start() throws Exception {
Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
@Override public Object run() throws Exception {
- workerState =
- new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
+ return loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials);
+ }
+ }); // Subject.doAs(...)
+
+ }
+
+ private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState, Map<String, String> initCreds, Credentials initialCredentials)
+ throws Exception {
+ workerState =
+ new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
stormClusterState);
- // Heartbeat here so that worker process dies if this fails
- // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
- // that worker is running and moves on
- doHeartBeat();
+ // Heartbeat here so that worker process dies if this fails
+ // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
+ // that worker is running and moves on
+ doHeartBeat();
- executorsAtom = new AtomicReference<>(null);
+ executorsAtom = new AtomicReference<>(null);
- // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
- // to the supervisor
- workerState.heartbeatTimer
- .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
- try {
- doHeartBeat();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
+ // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
+ // to the supervisor
+ workerState.heartbeatTimer
+ .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
+ try {
+ doHeartBeat();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
- workerState.executorHeartbeatTimer
- .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
+ workerState.executorHeartbeatTimer
+ .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
Worker.this::doExecutorHeartbeats);
- workerState.registerCallbacks();
+ workerState.registerCallbacks();
- workerState.refreshConnections(null);
+ workerState.refreshConnections(null);
- workerState.activateWorkerWhenAllConnectionsReady();
+ workerState.activateWorkerWhenAllConnectionsReady();
- workerState.refreshStormActive(null);
+ workerState.refreshStormActive(null);
- workerState.runWorkerStartHooks();
+ workerState.runWorkerStartHooks();
- List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
- for (List<Long> e : workerState.getExecutors()) {
- if (ConfigUtils.isLocalMode(topologyConf)) {
- newExecutors.add(
- LocalExecutor.mkExecutor(workerState, e, initCreds)
- .execute());
- } else {
- newExecutors.add(
- Executor.mkExecutor(workerState, e, initCreds)
- .execute());
- }
- }
- executorsAtom.set(newExecutors);
+ List<Executor> execs = new ArrayList<>();
+ for (List<Long> e : workerState.getExecutors()) {
+ if (ConfigUtils.isLocalMode(topologyConf)) {
+ Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
+ execs.add( executor );
+ workerState.localReceiveQueues.put(executor.getTaskIds().get(0), executor.getReceiveQueue());
+ } else {
+ Executor executor = Executor.mkExecutor(workerState, e, initCreds);
+ workerState.localReceiveQueues.put(executor.getTaskIds().get(0), executor.getReceiveQueue());
+ execs.add(executor);
+ }
+ }
- EventHandler<Object> tupleHandler = (packets, seqId, batchEnd) -> workerState
- .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>) packets, seqId, batchEnd);
+ List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
+ for (Executor executor : execs) {
+ newExecutors.add(executor.execute());
+ }
+ executorsAtom.set(newExecutors);
- // This thread will publish the messages destined for remote tasks to remote connections
- transferThread = Utils.asyncLoop(() -> {
- workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
- return 0L;
- });
+ // This thread will send out messages destined for remote tasks (on other workers)
+ transferThread = workerState.makeTransferThread();
+ transferThread.setName("Worker-Transfer");
- DisruptorBackpressureCallback disruptorBackpressureHandler =
- mkDisruptorBackpressureHandler(workerState);
- workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
- workerState.transferQueue
- .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
- workerState.transferQueue
- .setHighWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
- workerState.transferQueue
- .setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
-
- WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler();
- backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
- if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
- backpressureThread.start();
- stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle);
-
- int pollingSecs = ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
- workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, workerState::refreshThrottle);
- }
+ credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
- credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
+ establishLogSettingCallback();
- establishLogSettingCallback();
+ workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
- workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
+ workerState.refreshCredentialsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
+ @Override public void run() {
+ checkCredentialsChanged();
+ }
+ });
- workerState.refreshCredentialsTimer.scheduleRecurring(0,
- (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
- @Override public void run() {
- checkCredentialsChanged();
- if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
- checkThrottleChanged();
- }
+ workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
+ (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new Runnable() {
+ @Override public void run() {
+ try {
+ LOG.debug("Checking if blobs have updated");
+ updateBlobUpdates();
+ } catch (IOException e) {
+ // IOException from reading the version files to be ignored
+ LOG.error(e.getStackTrace().toString());
}
- });
-
- workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
- (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new Runnable() {
- @Override public void run() {
- try {
- LOG.debug("Checking if blobs have updated");
- updateBlobUpdates();
- } catch (IOException e) {
- // IOException from reading the version files to be ignored
- LOG.error(e.getStackTrace().toString());
- }
- }
- });
-
- // The jitter allows the clients to get the data at different times, and avoids thundering herd
- if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
- workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
- }
+ }
+ });
+
+ // The jitter allows the clients to get the data at different times, and avoids thundering herd
+ if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
+ workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
+ }
+
+ workerState.refreshConnectionsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
- workerState.refreshConnectionsTimer.scheduleRecurring(0,
- (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
+ workerState.resetLogLevelsTimer.scheduleRecurring(0,
+ (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
- workerState.resetLogLevelsTimer.scheduleRecurring(0,
- (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
+ workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
+ workerState::refreshStormActive);
- workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
- workerState::refreshStormActive);
+ setupFlushTupleTimer(topologyConf, newExecutors);
+ setupBackPressureCheckTimer(topologyConf);
+
+ LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
--- End diff --
nit: I think we log this elsewhere too, so it might be good to remove this.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r168033466
--- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus {
+ public static final short IDENTIFIER = (short)-600;
+ private static final int SIZE_OF_ID = 2; // size if IDENTIFIER
+ private static final int SIZE_OF_INT = 4;
+
+ private static AtomicLong bpCount = new AtomicLong(0);
+
+ public String workerId;
+ public final long id; // monotonically increasing id
--- End diff --
OK thanks for clarification. I thought it as some kinds of guarantee we should ensure. Maybe better to clear out that that's not a requirement and only for debugging purpose.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r161361184
--- Diff: storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java ---
@@ -24,50 +24,46 @@
import org.apache.storm.task.GeneralTopologyContext;
public class TupleImpl implements Tuple {
- private final List<Object> values;
- private final int taskId;
- private final String streamId;
- private final GeneralTopologyContext context;
- private final MessageId id;
+ private List<Object> values;
+ private int taskId;
+ private String streamId;
+ private GeneralTopologyContext context;
+ private MessageId id;
+ private final String srcComponent;
private Long _processSampleStartTime;
private Long _executeSampleStartTime;
private long _outAckVal = 0;
-
+
public TupleImpl(Tuple t) {
this.values = t.getValues();
this.taskId = t.getSourceTask();
this.streamId = t.getSourceStreamId();
this.id = t.getMessageId();
this.context = t.getContext();
- if (t instanceof TupleImpl) {
+ this.srcComponent = t.getSourceComponent();
+ try {
TupleImpl ti = (TupleImpl) t;
this._processSampleStartTime = ti._processSampleStartTime;
this._executeSampleStartTime = ti._executeSampleStartTime;
this._outAckVal = ti._outAckVal;
+ } catch (ClassCastException e) {
+ // ignore ... if t is not a TupleImpl type .. faster than checking and then casting
}
}
- public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
+ public TupleImpl(GeneralTopologyContext context, List<Object> values, String srcComponent, int taskId, String streamId, MessageId id) {
this.values = Collections.unmodifiableList(values);
this.taskId = taskId;
this.streamId = streamId;
this.id = id;
this.context = context;
-
- String componentId = context.getComponentId(taskId);
- Fields schema = context.getComponentOutputFields(componentId, streamId);
- if(values.size()!=schema.size()) {
- throw new IllegalArgumentException(
- "Tuple created with wrong number of fields. " +
- "Expected " + schema.size() + " fields but got " +
- values.size() + " fields");
- }
--- End diff --
Ok, enabling it for local mode. Doing my best to eliminate map lookups in critical path.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r164956555
--- Diff: storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java ---
@@ -17,72 +17,124 @@
*/
package org.apache.storm.executor;
-import com.google.common.annotations.VisibleForTesting;
-import com.lmax.disruptor.EventHandler;
import org.apache.storm.Config;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.serialization.KryoTupleSerializer;
import org.apache.storm.tuple.AddressedTuple;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.DisruptorQueue;
-import org.apache.storm.utils.MutableObject;
+import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.Queue;
-public class ExecutorTransfer implements EventHandler, Callable {
+// Every executor has an instance of this class
+public class ExecutorTransfer {
private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class);
private final WorkerState workerData;
- private final DisruptorQueue batchTransferQueue;
- private final Map<String, Object> topoConf;
private final KryoTupleSerializer serializer;
- private final MutableObject cachedEmit;
private final boolean isDebug;
+ private final int producerBatchSz;
+ private int remotesBatchSz = 0;
+ private int indexingBase = 0;
+ private ArrayList<JCQueue> localReceiveQueues; // [taskId-indexingBase] => queue : List of all recvQs local to this worker
+ private ArrayList<JCQueue> queuesToFlush; // [taskId-indexingBase] => queue, some entries can be null. : outbound Qs for this executor instance
- public ExecutorTransfer(WorkerState workerData, DisruptorQueue batchTransferQueue, Map<String, Object> topoConf) {
+
+ public ExecutorTransfer(WorkerState workerData, Map<String, Object> topoConf) {
this.workerData = workerData;
- this.batchTransferQueue = batchTransferQueue;
- this.topoConf = topoConf;
this.serializer = new KryoTupleSerializer(topoConf, workerData.getWorkerTopologyContext());
- this.cachedEmit = new MutableObject(new ArrayList<>());
this.isDebug = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);
+ this.producerBatchSz = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
+ }
+
+ // to be called after all Executor objects in the worker are created and before this object is used
+ public void initLocalRecvQueues() {
+ Integer minTaskId = workerData.getLocalReceiveQueues().keySet().stream().min(Integer::compareTo).get();
+ this.localReceiveQueues = Utils.convertToArray( workerData.getLocalReceiveQueues(), minTaskId);
+ this.indexingBase = minTaskId;
+ this.queuesToFlush = new ArrayList<JCQueue>(Collections.nCopies(localReceiveQueues.size(), null) );
}
- public void transfer(int task, Tuple tuple) {
- AddressedTuple val = new AddressedTuple(task, tuple);
+ // adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null)
+ public boolean tryTransfer(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits) {
if (isDebug) {
- LOG.info("TRANSFERRING tuple {}", val);
+ LOG.info("TRANSFERRING tuple {}", addressedTuple);
+ }
+
+ JCQueue localQueue = getLocalQueue(addressedTuple);
+ if (localQueue!=null) {
+ return tryTransferLocal(addressedTuple, localQueue, pendingEmits);
+ } else {
+ if (remotesBatchSz >= producerBatchSz) {
+ if ( !workerData.tryFlushRemotes() ) {
+ if (pendingEmits != null) {
+ pendingEmits.add(addressedTuple);
+ }
+ return false;
+ }
+ remotesBatchSz = 0;
--- End diff --
@revans2
There is one instance of the ExecutorTransfer object per Executor. So the only way to invoke this concurrently is as follows:
- background threads are spun up by the spout/bolt executor and
- the outputcollector.emit() was called without any external synchronization.
is that the situation you were thinking ?
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159963018
--- Diff: storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java ---
@@ -23,7 +23,7 @@
import java.io.Serializable;
import java.util.List;
-public class TupleInfo implements Serializable {
+public final class TupleInfo implements Serializable {
--- End diff --
Why do we want this to be final? Just curious.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167330523
--- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus implements java.io.Serializable {
--- End diff --
Why does this need to be serializable if we are explicitly using kryo for all of the serialization? Can we just register the class with kryo instead so this will also work when java serialization is disabled?
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r161359459
--- Diff: storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java ---
@@ -20,6 +20,7 @@
import org.apache.storm.Config;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.StormTopology;
+import org.apache.storm.messaging.netty.BackPressureStatus;
--- End diff --
fixed.
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on the issue:
https://github.com/apache/storm/pull/2502
Hi, thanks for the great patch.
But I ran into some problems.
I ran
```
bin/storm jar storm-loadgen-*.jar org.apache.storm.loadgen.ThroughputVsLatency --spouts 1 --splitters 2 --counters 1 -c topology.debug=true
```
on ResourceAwareScheduler and it's not working properly.
It looks like the __acker-executor was not able to receive messages from spouts and bolts. And spouts and bolts continued to retry sending messages to acker. It then led to another problem:
https://issues.apache.org/jira/browse/STORM-2970
I tried to run on the storm right before this merge and it works properly. I then tried to run it on the storm right after this merge and this issue appears.
Could you please verify? Thanks!
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r168033940
--- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
@@ -22,19 +22,23 @@
import org.apache.storm.executor.TupleInfo;
import org.apache.storm.spout.ISpout;
import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.MessageId;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
import org.apache.storm.utils.MutableLong;
import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally
--- End diff --
To nail down and document the concurrent emits semantics I had opened [STORM-2945](https://issues.apache.org/jira/browse/STORM-2945)
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on the issue:
https://github.com/apache/storm/pull/2502
@roshannaik Thanks for looking into this. I used default configs on secure cluster with ResourceAwareScheduler.
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:
https://github.com/apache/storm/pull/2502
**Update:**
- The code has been rebased to latest master.
- Getting clean test runs on my local machine
- Travis runs are indicating these test failures.
1. **storm-client:JCQueueTest** - is failing for some reason here but working ok for me. I will look into it.
2. *storm-core* - The exact test or the nature of the failure is not clear from the travis logs. See some exceptions that are talking about MetricsStore ... this might not be related to this PR.
3. *org.apache.storm.cassandra.trident.MapStateTest* Seems to be stalling and causing a failure.
The cause of the last two failures is unclear to me as they are passing on my local machine and I cant see anything interesting in the Travis logs.
@revans2 : Have that open question for you wrt that race condition you brought up wrt ExecutorTransfer::remotesBatchSz. Rest everything has been addressed.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167286543
--- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
@@ -890,30 +871,91 @@
public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
/**
- * Configure timeout milliseconds used for disruptor queue wait strategy. Can be used to tradeoff latency
- * vs. CPU usage
+ * Selects the Bolt's Wait Strategy to use when there are no incoming msgs. Used to trade off latency vs CPU usage.
+ */
+ @isString
--- End diff --
Can we check if this is an instance of the proper parent interface?
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167282875
--- Diff: docs/Concepts.md ---
@@ -113,3 +113,8 @@ Topologies execute across one or more worker processes. Each worker process is a
**Resources:**
* [Config.TOPOLOGY_WORKERS](javadocs/org/apache/storm/Config.html#TOPOLOGY_WORKERS): this config sets the number of workers to allocate for executing the topology
+
+### Performance Tuning
+
+Refer to [performance tuning guide](docs/CONTRIBUTING.md)
--- End diff --
I don't think `CONTRIBUTING.md` is the performance tuning guide, also the path is relative, and since `Performance.md` is in the same directory as this we don't need the `docs/` in the link
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167388018
--- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus implements java.io.Serializable {
--- End diff --
Hmm interesting.
Wondering why ControlMessage, MessageBatch & SaslMessageToken dont follow the same pattern of registering ?
Can you confirm that you are suggesting the following steps ?
- Remove the inheritance from Serializable interface
- Register BackPressureStatus.class with kryo
- Remove the BackPressureStatus.buffer() and BackPressureStatus.read() methods ?
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167800249
--- Diff: storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchExecutor.java ---
@@ -194,7 +194,12 @@ public void reportError(Throwable t) {
public void emitDirect(int task, String stream, List<Object> values, Object id) {
throw new UnsupportedOperationException("Trident does not support direct streams");
}
-
+
+ @Override
+ public void flush() {
+ //NOOP //TODO: Roshan: validate if this is OK
--- End diff --
needs to flush.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r161358387
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java ---
@@ -22,24 +22,25 @@
public class SpoutThrottlingMetrics extends BuiltinMetrics {
private final CountMetric skippedMaxSpoutMs = new CountMetric();
- private final CountMetric skippedThrottleMs = new CountMetric();
private final CountMetric skippedInactiveMs = new CountMetric();
+ private final CountMetric skippedBackPressureMs = new CountMetric();
public SpoutThrottlingMetrics() {
metricMap.put("skipped-max-spout-ms", skippedMaxSpoutMs);
- metricMap.put("skipped-throttle-ms", skippedThrottleMs);
metricMap.put("skipped-inactive-ms", skippedInactiveMs);
+ metricMap.put("skipped-backpressure-ms", skippedBackPressureMs);
--- End diff --
fixed
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r164971328
--- Diff: storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java ---
@@ -89,63 +107,80 @@ public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, L
}
}
}
+ msgId = MessageId.makeId(anchorsToIds);
+ } else {
+ msgId = MessageId.makeUnanchored();
}
- MessageId msgId = MessageId.makeId(anchorsToIds);
- TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, taskId, streamId, msgId);
- executor.getExecutorTransfer().transfer(t, tupleExt);
+ TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), taskId, streamId, msgId);
+ xsfer.tryTransfer(new AddressedTuple(t, tupleExt), executor.getPendingEmits());
}
if (isEventLoggers) {
- executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), null, random);
+ task.sendToEventLogger(executor, values, executor.getComponentId(), null, random, executor.getPendingEmits());
}
return outTasks;
}
@Override
public void ack(Tuple input) {
+ if(!ackingEnabled)
+ return;
long ackValue = ((TupleImpl) input).getAckVal();
Map<Long, Long> anchorsToIds = input.getMessageId().getAnchorsToIds();
for (Map.Entry<Long, Long> entry : anchorsToIds.entrySet()) {
- executor.sendUnanchored(taskData, Acker.ACKER_ACK_STREAM_ID,
+ task.sendUnanchored(Acker.ACKER_ACK_STREAM_ID,
new Values(entry.getKey(), Utils.bitXor(entry.getValue(), ackValue)),
- executor.getExecutorTransfer());
+ executor.getExecutorTransfer(), executor.getPendingEmits());
}
long delta = tupleTimeDelta((TupleImpl) input);
if (isDebug) {
LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
}
- BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
- boltAckInfo.applyOn(taskData.getUserContext());
+
+ if ( !task.getUserContext().getHooks().isEmpty() ) {
+ BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
+ boltAckInfo.applyOn(task.getUserContext());
+ }
if (delta >= 0) {
- ((BoltExecutorStats) executor.getStats()).boltAckedTuple(
- input.getSourceComponent(), input.getSourceStreamId(), delta);
+ executor.getStats().boltAckedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta);
}
}
@Override
public void fail(Tuple input) {
+ if(!ackingEnabled)
+ return;
Set<Long> roots = input.getMessageId().getAnchors();
for (Long root : roots) {
- executor.sendUnanchored(taskData, Acker.ACKER_FAIL_STREAM_ID,
- new Values(root), executor.getExecutorTransfer());
+ task.sendUnanchored(Acker.ACKER_FAIL_STREAM_ID,
+ new Values(root), executor.getExecutorTransfer(), executor.getPendingEmits());
}
long delta = tupleTimeDelta((TupleImpl) input);
if (isDebug) {
LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
}
BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
- boltFailInfo.applyOn(taskData.getUserContext());
- if (delta >= 0) {
- ((BoltExecutorStats) executor.getStats()).boltFailedTuple(
- input.getSourceComponent(), input.getSourceStreamId(), delta);
+ boltFailInfo.applyOn(task.getUserContext());
+ if (delta != 0) {
--- End diff --
Looks like missed spot : this should be `delta >= 0`.
https://github.com/apache/storm/pull/2241/files?diff=split#r158213916
```
(when (<= 0 delta)
(stats/bolt-failed-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta))))
```
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:
https://github.com/apache/storm/pull/2502
@roshannaik
Hmm... OK. Interesting result. Since you're planning to refresh numbers in doc before Storm 2.0.0, so we can revisit the number to see whether there's odd result shown or not.
FYI my desktop spec and OS here:
OS: Ubuntu 17.10, 4.13.0-32-generic #35-Ubuntu SMP Thu Jan 25 09:13:46 UTC 2018 x86_64 x86_64 x86_64
CPU: AMD Ryzen 5 1600 3.2Ghz 6 core (with hyper-thread = 12 logical cores)
RAM: Samsung DDR4 32G 19200
SSD: Samsung 850 Evo
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:
https://github.com/apache/storm/pull/2502
@Ethanlm can you please open a jira for this issue ?
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r159949952
--- Diff: docs/Performance.md ---
@@ -0,0 +1,132 @@
+---
--- End diff --
Great Documentation, but can we have some of the other docs link to it?
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167325584
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.daemon.worker;
+
+import org.apache.storm.Config;
+import org.apache.storm.messaging.TaskMessage;
+import org.apache.storm.policy.IWaitStrategy;
+import org.apache.storm.serialization.ITupleSerializer;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.utils.JCQueue;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.TransferDrainer;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.Utils.SmartThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+// Transfers messages destined to other workers
+class WorkerTransfer implements JCQueue.Consumer {
+ static final Logger LOG = LoggerFactory.getLogger(WorkerTransfer.class);
+
+ private final TransferDrainer drainer;
+ private WorkerState workerState;
+ private IWaitStrategy backPressureWaitStrategy;
+
+ JCQueue transferQueue; // [remoteTaskId] -> JCQueue. Some entries maybe null (if no emits to those tasksIds from this worker)
--- End diff --
nit why does this need to be package private? Can we lock it down more?
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r168037754
--- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
@@ -22,19 +22,23 @@
import org.apache.storm.executor.TupleInfo;
import org.apache.storm.spout.ISpout;
import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.MessageId;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
import org.apache.storm.utils.MutableLong;
import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally
--- End diff --
It is to figure out what will have for Storm 2.0... since we cannot make any breaking changes even if we like to thereafter until 3.0.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167387629
--- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus implements java.io.Serializable {
--- End diff --
You should be able to get away with just registering the class and not providing a serializer.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167807269
--- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
@@ -44,14 +48,15 @@
private final Boolean isEventLoggers;
private final Boolean isDebug;
private final RotatingMap<Long, TupleInfo> pending;
+ private TupleInfo globalTupleInfo = new TupleInfo(); // thread safety: assumes Collector.emit*() calls are externally synchronized (if needed).
--- End diff --
To get the id of the current thread involves a call to Thread.currentThread() which is quite [expensive](!http://www.jutils.com/checks/performance.html)... so not good to use to determine whether or not to use fast path.
I am introducing that check if topology.debug is enabled as a compromise. This mode could
be used mode to do any checks in dev mode that are unnecessary or expensive to do repeatedly in production.
I have opened: [STORM-2945](!https://issues.apache.org/jira/browse/STORM-2945) to nail down and document background emits support.. we can document both spout & bolt support semantics together in the same document.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167625247
--- Diff: storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---
@@ -40,109 +40,117 @@
import org.apache.storm.nimbus.NimbusInfo;
public interface IStormClusterState {
- public List<String> assignments(Runnable callback);
+ List<String> assignments(Runnable callback);
- public Assignment assignmentInfo(String stormId, Runnable callback);
+ Assignment assignmentInfo(String stormId, Runnable callback);
- public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
+ VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
- public Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
+ Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
- public List<String> blobstoreInfo(String blobKey);
+ List<String> blobstoreInfo(String blobKey);
- public List<NimbusSummary> nimbuses();
+ List<NimbusSummary> nimbuses();
- public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
+ void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
- public List<String> activeStorms();
+ List<String> activeStorms();
/**
* Get a storm base for a topology
* @param stormId the id of the topology
* @param callback something to call if the data changes (best effort)
* @return the StormBase or null if it is not alive.
*/
- public StormBase stormBase(String stormId, Runnable callback);
+ StormBase stormBase(String stormId, Runnable callback);
- public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
+ ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
- public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
+ List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
- public List<ProfileRequest> getTopologyProfileRequests(String stormId);
+ List<ProfileRequest> getTopologyProfileRequests(String stormId);
- public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);
+ void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);
- public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
+ void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
- public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
+ Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
- public List<String> supervisors(Runnable callback);
+ List<String> supervisors(Runnable callback);
- public SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
+ SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
- public void setupHeatbeats(String stormId);
+ void setupHeatbeats(String stormId);
- public void teardownHeartbeats(String stormId);
+ void teardownHeartbeats(String stormId);
- public void teardownTopologyErrors(String stormId);
+ void teardownTopologyErrors(String stormId);
- public List<String> heartbeatStorms();
+ List<String> heartbeatStorms();
- public List<String> errorTopologies();
+ List<String> errorTopologies();
- public List<String> backpressureTopologies();
+ /** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon. */
--- End diff --
done. 3.0.0
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/storm/pull/2502
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:
https://github.com/apache/storm/pull/2502
Yes I checked it out wrong... My bad.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r168013771
--- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
@@ -28,19 +28,21 @@
import java.util.concurrent.atomic.AtomicLong;
// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
-public class BackPressureStatus implements java.io.Serializable {
- static final long serialVersionUID = 1L;
+public class BackPressureStatus {
public static final short IDENTIFIER = (short)-600;
private static final int SIZE_OF_ID = 2; // size if IDENTIFIER
private static final int SIZE_OF_INT = 4;
private static AtomicLong bpCount = new AtomicLong(0);
- public final String workerId;
+ public String workerId;
public final long id; // monotonically increasing id
--- End diff --
Just wondering: the characteristic of `monotonically increasing` guarantee `id` to be unique in a worker, but not among workers, and also reset to 0 after worker crash and restart. Does it hurt the backpressure logic at any chance?
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167286165
--- Diff: pom.xml ---
@@ -259,6 +259,7 @@
<snakeyaml.version>1.11</snakeyaml.version>
<httpclient.version>4.3.3</httpclient.version>
<clojure.tools.cli.version>0.2.4</clojure.tools.cli.version>
+ <jctools.version>2.0.1</jctools.version>
--- End diff --
Why does flux need jctools? Shouldn't it come with storm-client?
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167381302
--- Diff: storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---
@@ -40,109 +40,117 @@
import org.apache.storm.nimbus.NimbusInfo;
public interface IStormClusterState {
- public List<String> assignments(Runnable callback);
+ List<String> assignments(Runnable callback);
- public Assignment assignmentInfo(String stormId, Runnable callback);
+ Assignment assignmentInfo(String stormId, Runnable callback);
- public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
+ VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
- public Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
+ Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
- public List<String> blobstoreInfo(String blobKey);
+ List<String> blobstoreInfo(String blobKey);
- public List<NimbusSummary> nimbuses();
+ List<NimbusSummary> nimbuses();
- public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
+ void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
- public List<String> activeStorms();
+ List<String> activeStorms();
/**
* Get a storm base for a topology
* @param stormId the id of the topology
* @param callback something to call if the data changes (best effort)
* @return the StormBase or null if it is not alive.
*/
- public StormBase stormBase(String stormId, Runnable callback);
+ StormBase stormBase(String stormId, Runnable callback);
- public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
+ ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
- public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
+ List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
- public List<ProfileRequest> getTopologyProfileRequests(String stormId);
+ List<ProfileRequest> getTopologyProfileRequests(String stormId);
- public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);
+ void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);
- public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
+ void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
- public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
+ Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
- public List<String> supervisors(Runnable callback);
+ List<String> supervisors(Runnable callback);
- public SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
+ SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
- public void setupHeatbeats(String stormId);
+ void setupHeatbeats(String stormId);
- public void teardownHeartbeats(String stormId);
+ void teardownHeartbeats(String stormId);
- public void teardownTopologyErrors(String stormId);
+ void teardownTopologyErrors(String stormId);
- public List<String> heartbeatStorms();
+ List<String> heartbeatStorms();
- public List<String> errorTopologies();
+ List<String> errorTopologies();
- public List<String> backpressureTopologies();
+ /** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon. */
--- End diff --
Filed STORM-2944. Can you please add a 3.0 as a version to jira so that this jira can be associated with it ?
---
[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:
https://github.com/apache/storm/pull/2502
This looks really really good. I ran through the unit tests and some performance tests and everything looks great. Now I feel like I can get excited about this going in. I am going to spend tomorrow going through the code, but I am really hopeful that we can get this merged in next week.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167287026
--- Diff: storm-client/src/jvm/org/apache/storm/StormTimer.java ---
@@ -97,6 +97,8 @@ public void run() {
// events.
Time.sleep(1000);
}
+ if(Thread.interrupted())
+ this.active.set(false);
--- End diff --
Nit can we wrap this in `'{'` and `'}'` and have a space after the `if` to match the style guide.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167387808
--- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
@@ -44,14 +48,15 @@
private final Boolean isEventLoggers;
private final Boolean isDebug;
private final RotatingMap<Long, TupleInfo> pending;
+ private TupleInfo globalTupleInfo = new TupleInfo(); // thread safety: assumes Collector.emit*() calls are externally synchronized (if needed).
--- End diff --
Can we do a sanity check for the fast path + documentation? Check if the thread id is the same as the id of the main thread we expect emits to come from. If so we go with the fast path, if not we have a thread local or do some kind of locking + documentation about why you never want the spout to emit from a background thread.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167814459
--- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
+public class BackPressureStatus implements java.io.Serializable {
--- End diff --
Thanks for the detailed comment. Made the changes and tested them as well.
---
[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...
Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r160011014
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java ---
@@ -177,6 +195,35 @@ public BuiltinMetrics getBuiltInMetrics() {
return builtInMetrics;
}
+
+ // Non Blocking call. If cannot emit to destination immediately, such tuples will be added to `pendingEmits` argument
+ public void sendUnanchored(String stream, List<Object> values, ExecutorTransfer transfer, Queue<AddressedTuple> pendingEmits) {
+ Tuple tuple = getTuple(stream, values);
+ List<Integer> tasks = getOutgoingTasks(stream, values);
+ for (Integer t : tasks) {
+ AddressedTuple addressedTuple = new AddressedTuple(t, tuple);
+ transfer.tryTransfer(addressedTuple, pendingEmits);
+ }
+ }
+
+ /**
+ * Send sampled data to the eventlogger if the global or component level debug flag is set (via nimbus api).
+ */
+ public void sendToEventLogger(Executor executor, List values,
--- End diff --
@HeartSaVioR you pointed out some optimizations are possible to this .. that we can tackle in another jira ... can you elaborate or capture your thoughts into a jira ?
---