You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by roshannaik <gi...@git.apache.org> on 2018/01/05 04:16:41 UTC

[GitHub] storm pull request #2502: new PR for STORM-2306

GitHub user roshannaik opened a pull request:

    https://github.com/apache/storm/pull/2502

    new PR for STORM-2306

    Since the [old PR page](https://github.com/apache/storm/pull/2241) had become unusable (due to lots of old comments). Creating a new PR for the same.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/roshannaik/storm STORM-2306-2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2502.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2502
    
----
commit 25cce136d256ea07183d6503246abe6c211ac522
Author: Roshan Naik <ro...@...>
Date:   2017-07-25T03:01:00Z

    Messaging susbsytem redesign. Rebased to latest master. Validated compilation and few simple topo runs. C->N: 8 mil/sec. 2.8 mil/sec with 2 workers. 1 mil/sec with ack (30-800 micosec). C->ID->N: 6.2mill/sec.

commit 4153a2f6fb5036a216b285ca00c3cce8656f9996
Author: Roshan Naik <ro...@...>
Date:   2017-07-26T03:34:48Z

    addressing satish review comments

commit 2f070c30d09469a08edcb752ebdece61f4abddab
Author: Roshan Naik <ro...@...>
Date:   2017-08-01T07:39:10Z

    addressing review comments

commit ec4430616428d79c852e5a6a1fe0c64c61c4d021
Author: Roshan Naik <ro...@...>
Date:   2017-08-02T06:41:35Z

    Added Bolt Sleep strategy

commit 4a76691e744e16de9a58a44e359330856373da79
Author: Roshan Naik <ro...@...>
Date:   2017-08-03T19:38:42Z

    Added BackPressure Sleep Strategy. Shutdown bug fix

commit b052564ac7b66537815b7a0137a558f2927ddf98
Author: Roshan Naik <ro...@...>
Date:   2017-08-04T03:56:12Z

    Changing defaults for tighter latency and favor lower/medium throughput topologies

commit 8eea6aa0457c280ad582a3112b6828b6c9e60a00
Author: Roshan Naik <ro...@...>
Date:   2017-08-08T03:29:31Z

    Some minor simplifications. overriding settings within storm-perf topos as needed

commit 300a4f84eed239c08ea1f2ac0c4945353f0555e6
Author: Roshan Naik <ro...@...>
Date:   2017-08-08T03:30:17Z

    resolving runtime exception issues due to conflict with STORM-2672

commit 23b5f0979f44f157c0634296bd311ad6b290e276
Author: Robert Evans <ev...@...>
Date:   2017-08-08T15:44:14Z

    Merge branch 'STORM-2306m' of https://github.com/roshannaik/storm into STORM-2306
    
    This is a test, this is only a test

commit 1c628d1ba1eeea81e21b5d59a5cb5eaabea6c03b
Author: Robert Evans <ev...@...>
Date:   2017-08-08T20:44:51Z

    FIX some of the tests and metrics for the system bolt

commit b75192e1ae0311747bf00b5a6d320fc0e662183f
Author: roshannaik <ro...@...>
Date:   2017-08-11T20:07:03Z

    Merge pull request #3 from revans2/STORM-2306
    
    FIX some of the tests and metrics for the system bolt

commit 2506c6eaa9f04f3c57e6875d6078135ac99b8163
Author: Roshan Naik <ro...@...>
Date:   2017-08-12T00:37:58Z

    System Bolt doesnt get much incoming traffic, so can use its own pre-defined sleep strategy (consider making this configurable if useful)

commit 47579ea936cde21f87973ad4b73226737bfe806e
Author: Roshan Naik <ro...@...>
Date:   2017-08-15T09:01:09Z

    Fixing process_latency bug reported by Bobby

commit 4704245e44b0868cfa5237f97a1fca0686df11a9
Author: Roshan Naik <ro...@...>
Date:   2017-08-18T03:33:55Z

    Fixing the bug of wrong conf used for flushing as reported by Bobby. Adding Perf tuning document

commit 5cb42cbb247052eac1c7858192db09a583921258
Author: Roshan Naik <ro...@...>
Date:   2017-08-19T00:41:20Z

    Bug fix in BackPressure mode WaitStrategy to use the right settings. Updates to Performance.md

commit 942fbe6fd60fffe9d1cdc08e960d4b7752fc69ad
Author: Roshan Naik <ro...@...>
Date:   2017-08-21T21:35:29Z

    Aruns fix for OOM issue in Netty code (workerTransfer)

commit 5c223be2f774de9a9f4dded8450ef181f897a67a
Author: Roshan Naik <ro...@...>
Date:   2017-08-22T03:24:56Z

    Flush Tuples need to be put into the Worker Transfer Queue as well

commit b1a99a038319dfe15d4ac4dfde819f0008ce5291
Author: Roshan Naik <ro...@...>
Date:   2017-08-28T23:11:40Z

    Allow ConstSpout to have a configurable sleep for throttling. Improve defaults for sleep strategy, queue size & max.spout.pending.

commit f9beb5358c91a799bec49f071a8f68153042a48a
Author: Roshan Naik <ro...@...>
Date:   2017-08-29T05:29:31Z

    Revert changes in ShuffleGrouping until there is consensus on thread safety in groupers.

commit 6cd1424956a0d95f36a6b10158107dfd4ac02a1b
Author: Roshan Naik <ro...@...>
Date:   2017-08-31T10:54:24Z

    Introduce overflow to avoid deadlocked cycles involving ACKer. Added more non-blocking methods to JCQueue. Reverting max.spout.pending to null. Minor updates to ConstSpoutNullBolt Topo. Noticed a bug (not fixed) that can lead to OOM in multiworker mode

commit 6bfda34cfdc945cd0cb58c7e13369d577129e6b2
Author: roshannaik <ro...@...>
Date:   2017-08-31T11:03:59Z

    Merge branch 'master' into STORM-2306m

commit ece4ce2d8c829da53cfb48448d4f988110e237aa
Author: Roshan Naik <ro...@...>
Date:   2017-09-01T21:10:34Z

    bug fix in SpoutOutputCollectorImpl.emit()

commit ba0b9e2a2a5d5a99c50f2506a7f76a261fa19fc0
Author: Roshan Naik <ro...@...>
Date:   2017-09-14T07:47:00Z

    Moving worker xsfer code out of WorkerState into WorkerTransfer.java - interworker=3mill/sec. withAck=700k/s,1.3ms

commit c275bfc11c12ff743ccf26c5bbc8db08e7c34030
Author: Roshan Naik <ro...@...>
Date:   2017-11-14T23:27:31Z

    - Squishing recent commits related to Interworker backpressure communication
    - Fixed some issues found during multi-worker testing
    - Fixes to perf topos to pick up cmd line args correctly
    - Removing support for single producer mode in JCQueue
    - Almost all msg transfers are non-blocking now (other than metrics and credentials-change notifications). Allows spouts and bolts to processes metrics ticks even if output path is choked due to backpressure
    - Some temporary debug logs that need to be removed after testing
    - Needs more multiworker mode testing

commit a6f84c6784eb1a66b968b6b01262e76a136f3d89
Author: Roshan Naik <ro...@...>
Date:   2017-11-30T10:19:33Z

    - Fixing  bug  in tryTransferLocal() that caused Trident issues.
    - Adding workerID to BpStatus for better debuggability
    - Logging the length of an idle stretch for BP  & max.spout.pending wait situations
    - Changes to defaults:  topology.executor.receive.buffer.size=32k (rounding up to power of 2),   topology.flush.tuple.freq.millis=1 (same as master)
    - minor fixes and improvements

commit d0aa1ad27b9375a2c393c760427e24f3158441b8
Author: Roshan Naik <ro...@...>
Date:   2017-11-30T10:21:52Z

    Change logLevels. Fix unit test

commit b6fc2b8aaa92c41f786b53039989b373cbf6cd1d
Author: Roshan Naik <ro...@...>
Date:   2017-12-03T09:08:35Z

    Addressing minor TODOs

commit 39b395d4f5b3c5fa50a1ebdb08333264d40a9308
Author: Roshan Naik <ro...@...>
Date:   2017-12-05T01:12:21Z

     - Renaming config settings for easier understanding   ( Progressive Wait Strategy settings   + topology.flush.tuple.freq.millis -> topology.batch.flush.interval.millis )

commit 1f35bc9c16795177704ffd8eed4500aa735c24e3
Author: Roshan Naik <ro...@...>
Date:   2017-12-05T01:13:34Z

    - Added message Drop Metrics
    - Added BackPressure unit tests

commit b179a1937ec46ac0651c88b15b9e35dbfd55aef6
Author: Roshan Naik <ro...@...>
Date:   2017-12-05T05:05:29Z

    Client.send throws IOException is connection closes. Allows it to recover from remote worker dying

----


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159960500
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
    @@ -225,51 +228,62 @@ private static String getExecutorType(WorkerTopologyContext workerTopologyContex
             }
         }
     
    +    public Queue<AddressedTuple> getPendingEmits() {
    +        return pendingEmits;
    +    }
    +
         /**
          * separated from mkExecutor in order to replace executor transfer in executor data for testing
          */
         public ExecutorShutdown execute() throws Exception {
             LOG.info("Loading executor tasks " + componentId + ":" + executorId);
     
    -        registerBackpressure();
    -        Utils.SmartThread systemThreads =
    -                Utils.asyncLoop(executorTransfer, executorTransfer.getName(), reportErrorDie);
    -
             String handlerName = componentId + "-executor" + executorId;
    -        Utils.SmartThread handlers =
    +        Utils.SmartThread handler =
                     Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName);
             setupTicks(StatsUtil.SPOUT.equals(type));
     
             LOG.info("Finished loading executor " + componentId + ":" + executorId);
    -        return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask);
    +        return new ExecutorShutdown(this, Lists.newArrayList(handler), idToTask);
         }
     
         public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception;
     
    -    @SuppressWarnings("unchecked")
         @Override
    -    public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception {
    -        ArrayList<AddressedTuple> addressedTuples = (ArrayList<AddressedTuple>) event;
    -        for (AddressedTuple addressedTuple : addressedTuples) {
    -            TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
    -            int taskId = addressedTuple.getDest();
    -            if (isDebug) {
    -                LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
    -            }
    +    public void accept(Object event) {
    +        if (event == JCQueue.INTERRUPT) {
    +            throw new RuntimeException(new InterruptedException("JCQ processing interrupted") );
    +        }
    +        AddressedTuple addressedTuple =  (AddressedTuple)event;
    +        int taskId = addressedTuple.getDest();
    +
    +        TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
    +        if (isDebug) {
    +            LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
    +        }
    +
    +        try {
                 if (taskId != AddressedTuple.BROADCAST_DEST) {
                     tupleActionFn(taskId, tuple);
                 } else {
                     for (Integer t : taskIds) {
                         tupleActionFn(t, tuple);
                     }
                 }
    +        } catch (Exception e) {
    +            throw new RuntimeException(e);
    --- End diff --
    
    Where is this exception handled?


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159958378
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -155,134 +150,159 @@ public void start() throws Exception {
     
             Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
                 @Override public Object run() throws Exception {
    -                workerState =
    -                    new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
    +                return loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials);
    +            }
    +        }); // Subject.doAs(...)
    +
    +    }
    +
    +    private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState, Map<String, String> initCreds, Credentials initialCredentials)
    +            throws Exception {
    +        workerState =
    +                new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
                             stormClusterState);
     
    -                // Heartbeat here so that worker process dies if this fails
    -                // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
    -                // that worker is running and moves on
    -                doHeartBeat();
    +        // Heartbeat here so that worker process dies if this fails
    +        // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
    +        // that worker is running and moves on
    +        doHeartBeat();
     
    -                executorsAtom = new AtomicReference<>(null);
    +        executorsAtom = new AtomicReference<>(null);
     
    -                // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
    -                // to the supervisor
    -                workerState.heartbeatTimer
    -                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
    -                        try {
    -                            doHeartBeat();
    -                        } catch (IOException e) {
    -                            throw new RuntimeException(e);
    -                        }
    -                    });
    +        // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
    +        // to the supervisor
    +        workerState.heartbeatTimer
    +                .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
    +                    try {
    +                        doHeartBeat();
    +                    } catch (IOException e) {
    +                        throw new RuntimeException(e);
    +                    }
    +                });
     
    -                workerState.executorHeartbeatTimer
    -                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
    +        workerState.executorHeartbeatTimer
    +                .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
                             Worker.this::doExecutorHeartbeats);
     
    -                workerState.registerCallbacks();
    +        workerState.registerCallbacks();
     
    -                workerState.refreshConnections(null);
    +        workerState.refreshConnections(null);
     
    -                workerState.activateWorkerWhenAllConnectionsReady();
    +        workerState.activateWorkerWhenAllConnectionsReady();
     
    -                workerState.refreshStormActive(null);
    +        workerState.refreshStormActive(null);
     
    -                workerState.runWorkerStartHooks();
    +        workerState.runWorkerStartHooks();
     
    -                List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
    -                for (List<Long> e : workerState.getExecutors()) {
    -                    if (ConfigUtils.isLocalMode(topologyConf)) {
    -                        newExecutors.add(
    -                            LocalExecutor.mkExecutor(workerState, e, initCreds)
    -                                .execute());
    -                    } else {
    -                        newExecutors.add(
    -                            Executor.mkExecutor(workerState, e, initCreds)
    -                                .execute());
    -                    }
    -                }
    -                executorsAtom.set(newExecutors);
    +        List<Executor> execs = new ArrayList<>();
    +        for (List<Long> e : workerState.getExecutors()) {
    +            if (ConfigUtils.isLocalMode(topologyConf)) {
    +                Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
    +                execs.add( executor );
    +                workerState.localReceiveQueues.put(executor.getTaskIds().get(0), executor.getReceiveQueue());
    +            } else {
    +                Executor executor = Executor.mkExecutor(workerState, e, initCreds);
    +                workerState.localReceiveQueues.put(executor.getTaskIds().get(0), executor.getReceiveQueue());
    +                execs.add(executor);
    +            }
    +        }
     
    -                EventHandler<Object> tupleHandler = (packets, seqId, batchEnd) -> workerState
    -                    .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>) packets, seqId, batchEnd);
    +        List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
    +        for (Executor executor : execs) {
    +            newExecutors.add(executor.execute());
    +        }
    +        executorsAtom.set(newExecutors);
     
    -                // This thread will publish the messages destined for remote tasks to remote connections
    -                transferThread = Utils.asyncLoop(() -> {
    -                    workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
    -                    return 0L;
    -                });
    +        // This thread will send out messages destined for remote tasks (on other workers)
    +        transferThread = workerState.makeTransferThread();
    +        transferThread.setName("Worker-Transfer");
     
    -                DisruptorBackpressureCallback disruptorBackpressureHandler =
    -                    mkDisruptorBackpressureHandler(workerState);
    -                workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
    -                workerState.transferQueue
    -                    .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
    -                workerState.transferQueue
    -                    .setHighWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
    -                workerState.transferQueue
    -                    .setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
    -
    -                WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler();
    -                backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
    -                if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    -                    backpressureThread.start();
    -                    stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle);
    -                    
    -                    int pollingSecs = ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
    -                    workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, workerState::refreshThrottle);
    -                }
    +        credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
     
    -                credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
    +        establishLogSettingCallback();
     
    -                establishLogSettingCallback();
    +        workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
     
    -                workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
    +        workerState.refreshCredentialsTimer.scheduleRecurring(0,
    +                (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
    +                    @Override public void run() {
    +                        checkCredentialsChanged();
    +                    }
    +                });
     
    -                workerState.refreshCredentialsTimer.scheduleRecurring(0,
    -                    (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
    -                        @Override public void run() {
    -                            checkCredentialsChanged();
    -                            if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    -                               checkThrottleChanged();
    -                            }
    +        workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
    +                (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new Runnable() {
    +                    @Override public void run() {
    +                        try {
    +                            LOG.debug("Checking if blobs have updated");
    +                            updateBlobUpdates();
    +                        } catch (IOException e) {
    +                            // IOException from reading the version files to be ignored
    +                            LOG.error(e.getStackTrace().toString());
                             }
    -                    });
    -
    -                workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
    -                        (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new Runnable() {
    -                            @Override public void run() {
    -                                try {
    -                                    LOG.debug("Checking if blobs have updated");
    -                                    updateBlobUpdates();
    -                                } catch (IOException e) {
    -                                    // IOException from reading the version files to be ignored
    -                                    LOG.error(e.getStackTrace().toString());
    -                                }
    -                            }
    -                        });
    -
    -                // The jitter allows the clients to get the data at different times, and avoids thundering herd
    -                if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
    -                    workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
    -                }
    +                    }
    +                });
    +
    +        // The jitter allows the clients to get the data at different times, and avoids thundering herd
    +        if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
    +            workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
    +        }
    +
    +        workerState.refreshConnectionsTimer.scheduleRecurring(0,
    +                (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
     
    -                workerState.refreshConnectionsTimer.scheduleRecurring(0,
    -                    (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
    +        workerState.resetLogLevelsTimer.scheduleRecurring(0,
    +                (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
     
    -                workerState.resetLogLevelsTimer.scheduleRecurring(0,
    -                    (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
    +        workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
    +                workerState::refreshStormActive);
     
    -                workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
    -                    workerState::refreshStormActive);
    +        setupFlushTupleTimer(topologyConf, newExecutors);
    +        setupBackPressureCheckTimer(topologyConf);
    +
    +        LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
    +        LOG.info("Worker {} for storm {} on {}:{}  has finished loading", workerId, topologyId, assignmentId, port);
    +        return this;
    +    }
     
    -                LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
    -                LOG.info("Worker {} for storm {} on {}:{}  has finished loading", workerId, topologyId, assignmentId, port);
    -                return this;
    -            };
    +    private void setupFlushTupleTimer(final Map<String, Object> topologyConf, final List<IRunningExecutor> executors) {
    +        final Integer producerBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
    +        final Integer xferBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_TRANSFER_BATCH_SIZE));
    +        final Long flushIntervalMillis = ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BATCH_FLUSH_INTERVAL_MILLIS));
    +        if ((producerBatchSize == 1 && xferBatchSize == 1) || flushIntervalMillis == 0) {
    +            LOG.info("Flush Tuple generation disabled. producerBatchSize={}, xferBatchSize={}, flushIntervalMillis={}", producerBatchSize, xferBatchSize, flushIntervalMillis);
    +            return;
    +        }
    +
    +        workerState.flushTupleTimer.scheduleRecurringMs(flushIntervalMillis, flushIntervalMillis, new Runnable() {
    +            @Override
    +            public void run() {
    +                // send flush tuple to all executors
    +                for (int i = 0; i < executors.size(); i++) {
    +                    IRunningExecutor exec = executors.get(i);
    +                    if (exec.getExecutorId().get(0) != Constants.SYSTEM_TASK_ID) {
    +                        exec.getExecutor().publishFlushTuple();
    +                    }
    +                }
    +            }
             });
    +        LOG.info("Flush tuple will be generated every {} millis", flushIntervalMillis);
    +    }
     
    +    private void setupBackPressureCheckTimer(final Map<String, Object> topologyConf) {
    +        final Integer workerCount = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_WORKERS));
    +        if (workerCount <= 1) {
    +            LOG.info("BackPressure change checking is disabled as there is only one worker");
    +            return;
    +        }
    +        final Long bpCheckIntervalMs = ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_CHECK_MILLIS));
    +        workerState.backPressureCheckTimer.scheduleRecurringMs(bpCheckIntervalMs, bpCheckIntervalMs, new Runnable() {
    --- End diff --
    
    nit: Could we use java 8 lambdas here?
    
    ```
    workerState.backPressureCheckTimer.scheduleRecurringMs(bpCheckIntervalMs, bpCheckIntervalMs,
        () -> workerState.refreshBackPressureStatus());
    ```


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159955584
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java ---
    @@ -22,24 +22,25 @@
     
     public class SpoutThrottlingMetrics extends BuiltinMetrics {
         private final CountMetric skippedMaxSpoutMs = new CountMetric();
    -    private final CountMetric skippedThrottleMs = new CountMetric();
         private final CountMetric skippedInactiveMs = new CountMetric();
    +    private final CountMetric skippedBackPressureMs = new CountMetric();
     
         public SpoutThrottlingMetrics() {
             metricMap.put("skipped-max-spout-ms", skippedMaxSpoutMs);
    -        metricMap.put("skipped-throttle-ms", skippedThrottleMs);
             metricMap.put("skipped-inactive-ms", skippedInactiveMs);
    +        metricMap.put("skipped-backpressure-ms", skippedBackPressureMs);
    --- End diff --
    
    ./docs/Metrics.md describes these.  It should be updated to remove skipped-throttle-ms and replaced with skipped-backpressure-ms (and preferably mention that in older versions of storm skipped-throttle-ms would have been similar)


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r161359446
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java ---
    @@ -23,7 +23,7 @@
     import java.io.Serializable;
     import java.util.List;
     
    -public class TupleInfo implements Serializable {
    +public final class TupleInfo implements Serializable {
    --- End diff --
    
    I think i was wondering if marking some of the frequently allocated and used objects as final impacts perf. Not necessary, I will revert it.


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    @roshannaik I may have checked it out wrong, I will try again...


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    Commits Quashed. Thanks. Will use follow up jiras to address any issues discovered.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167328981
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
    @@ -44,14 +48,15 @@
         private final Boolean isEventLoggers;
         private final Boolean isDebug;
         private final RotatingMap<Long, TupleInfo> pending;
    +    private TupleInfo globalTupleInfo = new TupleInfo();  // thread safety: assumes Collector.emit*() calls are externally synchronized (if needed).
    --- End diff --
    
    This feels like a good assumption for a spout, but I would like to understand the cost of making this thread safe (thread local instance etc), and at least document it if that cost is high, or preferably find a low cost solution to throw an exception if it does happen.


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    I am still seeing test failures when running `mvn clean install -Pall-tests -fn | tee log.txt`
    
    `integration.org.apache.storm.integration-test`  Looks like an NPE at ExecutorTransfer.java:114 appears to be an issue with initializing the local receive queues.
    
    `storm-starter` is also getting several errors.  `test-intermediate-rankings-bolt` is getting an array index out of bounds exception using `RankableObjectWithFields`
    
    `test-exclamation-bolt`, `test-total-rankings-bolt` and `test-rolling-bolt` are getting an NPE at GeneralTopologyContext.java:112 which looks like the component to stream table is not being setup properly, at least for the clojure API.
    
    I'll see if there is more that I can come up with, and I'll try to spend more time looking at the actual code changes.



---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r168015462
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License
    + */
    +
    +package org.apache.storm.messaging.netty;
    +
    +import org.apache.storm.serialization.KryoValuesDeserializer;
    +import org.apache.storm.serialization.KryoValuesSerializer;
    +import org.jboss.netty.buffer.ChannelBuffer;
    +import org.jboss.netty.buffer.ChannelBuffers;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
    +public class BackPressureStatus {
    +    public static final short IDENTIFIER = (short)-600;
    +    private static final int SIZE_OF_ID = 2; // size if IDENTIFIER
    +    private static final int SIZE_OF_INT = 4;
    +
    +    private static AtomicLong bpCount = new AtomicLong(0);
    +
    +    public String workerId;
    +    public final long id;                       // monotonically increasing id
    --- End diff --
    
    Sorry to leave two duplicated comments: I commented to old commit.
    
    Just wondering: the characteristic of `monotonically increasing` guarantee `id` to be unique in a worker, but not among workers, and also reset to 0 after worker crash and restart. Does it hurt the backpressure logic at any chance?


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167285848
  
    --- Diff: flux/flux-core/src/test/java/org/apache/storm/flux/multilang/MultilangEnvironmentTest.java ---
    @@ -18,6 +18,7 @@
     package org.apache.storm.flux.multilang;
     
     
    +import org.junit.Ignore;
    --- End diff --
    
    nit: I don't think this is used in here, so can we remove it?


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r168032846
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
    @@ -22,19 +22,23 @@
     import org.apache.storm.executor.TupleInfo;
     import org.apache.storm.spout.ISpout;
     import org.apache.storm.spout.ISpoutOutputCollector;
    +import org.apache.storm.tuple.AddressedTuple;
     import org.apache.storm.tuple.MessageId;
     import org.apache.storm.tuple.TupleImpl;
     import org.apache.storm.tuple.Values;
    -import org.apache.storm.utils.Utils;
     import org.apache.storm.utils.MutableLong;
     import org.apache.storm.utils.RotatingMap;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
     import java.util.ArrayList;
     import java.util.List;
     import java.util.Random;
     
    +// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally
    --- End diff --
    
    nit: better to make it as javadoc so that it can be exposed to more ways.
    
    As @revans2 stated, I also think this is a good assumption for a spout, but even better to update the restriction if we have documented any. That's just a 2 cents, not blocker.


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    @revans2 , UTs for the storm-client module were fixed (including JCQueueTest). They run cleanly  for me.  Are you seeing them fail in your local setup ?
    
    **Rebasing:** Due to the length of the review cycle this PR needed and the need for a stable point for perf runs, I avoided keeping up to date with latest master, so far. Planning to address all review comments first. There are around 300 commits on master to catch up on right now. Will take me a week I think to rebase.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159968476
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java ---
    @@ -76,6 +76,32 @@ public static Integer getInt(Object o, Integer defaultValue) {
             throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
         }
     
    +    public static Long getLong(Object o) {
    +        return getLong(o, null);
    +    }
    +
    +    public static Long getLong(Object o, Long defaultValue) {
    +        if (null == o) {
    +            return defaultValue;
    +        }
    +
    +        if ( o instanceof Long ||
    +                o instanceof Integer ||
    +                o instanceof Short ||
    +                o instanceof Byte) {
    +            return ((Number) o).longValue();
    +        } else if (o instanceof Double) {
    +            final long l = (Long) o;
    --- End diff --
    
    This is not going to work here as a Long can never be larger then MAX_VALUE for a long or smaller then MIN_VALUE for a Long. Technically I think there may be a bug in getInt if the number is larger then a Long can hold.


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    @revans2  and @HeartSaVioR
    Just rebased this PR and also included the fix for the making ExecutorTransfer.tryTransfer() thread safe to  allow concurrent emits from background threads spun by the Bolt/Spout executors. 
    
    I am hoping we can revisit this topic of how to allow concurrent emits without making it internally thread safe, after this PR is merged.
    This fix should unblock this PR and allow us to discuss the issue separately.
     



---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r168057328
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
    @@ -22,19 +22,23 @@
     import org.apache.storm.executor.TupleInfo;
     import org.apache.storm.spout.ISpout;
     import org.apache.storm.spout.ISpoutOutputCollector;
    +import org.apache.storm.tuple.AddressedTuple;
     import org.apache.storm.tuple.MessageId;
     import org.apache.storm.tuple.TupleImpl;
     import org.apache.storm.tuple.Values;
    -import org.apache.storm.utils.Utils;
     import org.apache.storm.utils.MutableLong;
     import org.apache.storm.utils.RotatingMap;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
     import java.util.ArrayList;
     import java.util.List;
     import java.util.Random;
     
    +// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally
    --- End diff --
    
    Done.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159959648
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
    @@ -196,19 +197,21 @@ public static Executor mkExecutor(WorkerState workerState, List<Long> executorId
                 executor.stats = new BoltExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()),ObjectReader.getInt(executor.getStormConf().get(Config.NUM_STAT_BUCKETS)));
             }
     
    +        int minId = Integer.MAX_VALUE;
             Map<Integer, Task> idToTask = new HashMap<>();
             for (Integer taskId : taskIds) {
    +            minId = Math.min(minId, taskId);
                 try {
                     Task task = new Task(executor, taskId);
    -                executor.sendUnanchored(
    -                        task, StormCommon.SYSTEM_STREAM_ID, new Values("startup"), executor.getExecutorTransfer());
    +                task.sendUnanchored( StormCommon.SYSTEM_STREAM_ID, new Values("startup"), executor.getExecutorTransfer(), null); // TODO: Roshan: does this get delivered/handled anywhere ?
    --- End diff --
    
    Answer: storm-core/test/clj/integration/org/apache/storm/integration_test.clj
    
    It can sometimes be helpful with debugging when you have topology.debug enabled, but it is not that critical and can probably be removed if you want to.  If not please remove the comment.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159954152
  
    --- Diff: storm-client/src/jvm/org/apache/storm/StormTimer.java ---
    @@ -193,6 +210,24 @@ public void run() {
             });
         }
     
    +    /**
    +     * Schedule a function to run recurrently
    +     * @param delayMs the number of millis to delay before running the function
    +     * @param recurMs the time between each invocation
    +     * @param func the function to run
    +     */
    +    public void scheduleRecurringMs(long delayMs, final long recurMs, final Runnable func) {
    +        scheduleMs(delayMs, new Runnable() {
    --- End diff --
    
    nit could we use a java 8 lambda here instead?
    
    ```
    scheduleMs(delayMs, () -> {
        func.run();
        // This avoids a race condition with cancel-timer.
        scheduleMs(recurMs, this, true, 0);
    });
    ```


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159967636
  
    --- Diff: storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java ---
    @@ -24,50 +24,46 @@
     import org.apache.storm.task.GeneralTopologyContext;
     
     public class TupleImpl implements Tuple {
    -    private final List<Object> values;
    -    private final int taskId;
    -    private final String streamId;
    -    private final GeneralTopologyContext context;
    -    private final MessageId id;
    +    private List<Object> values;
    +    private int taskId;
    +    private String streamId;
    +    private GeneralTopologyContext context;
    +    private MessageId id;
    +    private final String srcComponent;
         private Long _processSampleStartTime;
         private Long _executeSampleStartTime;
         private long _outAckVal = 0;
    -    
    +
         public TupleImpl(Tuple t) {
             this.values = t.getValues();
             this.taskId = t.getSourceTask();
             this.streamId = t.getSourceStreamId();
             this.id = t.getMessageId();
             this.context = t.getContext();
    -        if (t instanceof TupleImpl) {
    +        this.srcComponent = t.getSourceComponent();
    +        try {
                 TupleImpl ti = (TupleImpl) t;
                 this._processSampleStartTime = ti._processSampleStartTime;
                 this._executeSampleStartTime = ti._executeSampleStartTime;
                 this._outAckVal = ti._outAckVal;
    +        } catch (ClassCastException e) {
    +            // ignore ... if t is not a TupleImpl type .. faster than checking and then casting
             }
         }
     
    -    public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
    +    public TupleImpl(GeneralTopologyContext context, List<Object> values, String srcComponent, int taskId, String streamId, MessageId id) {
             this.values = Collections.unmodifiableList(values);
             this.taskId = taskId;
             this.streamId = streamId;
             this.id = id;
             this.context = context;
    -        
    -        String componentId = context.getComponentId(taskId);
    -        Fields schema = context.getComponentOutputFields(componentId, streamId);
    -        if(values.size()!=schema.size()) {
    -            throw new IllegalArgumentException(
    -                    "Tuple created with wrong number of fields. " +
    -                    "Expected " + schema.size() + " fields but got " +
    -                    values.size() + " fields");
    -        }
    --- End diff --
    
    I know this slows things down, but it is a good sanity check.  Would it be possible to have a way to configure this on?  Like in local mode?  We would have to cache it though because just checking the conf might be as long as doing this check.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159969910
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java ---
    @@ -79,6 +79,7 @@
         protected final long lowMemoryThresholdMB;
         protected final long mediumMemoryThresholdMb;
         protected final long mediumMemoryGracePeriodMs;
    +    private static int port = 5006;  // TODO: Roshan: remove this after stabilization
    --- End diff --
    
    Yes please make sure this is removed, as leaving it in place is a security vulnerability.
      


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167326878
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
    @@ -313,107 +330,74 @@ public void metricsTick(Task taskData, TupleImpl tuple) {
         protected void setupMetrics() {
             for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) {
                 StormTimer timerTask = workerData.getUserTimer();
    -            timerTask.scheduleRecurring(interval, interval, new Runnable() {
    -                @Override
    -                public void run() {
    -                    TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(interval),
    -                            (int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID);
    -                    List<AddressedTuple> metricsTickTuple =
    -                            Lists.newArrayList(new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple));
    -                    receiveQueue.publish(metricsTickTuple);
    +            timerTask.scheduleRecurring(interval, interval,
    +                () -> {
    +                    TupleImpl tuple = new TupleImpl(workerTopologyContext, new Values(interval), Constants.SYSTEM_COMPONENT_ID,
    +                        (int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID);
    +                    AddressedTuple metricsTickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
    +                    try {
    +                        receiveQueue.publish(metricsTickTuple);
    +                        receiveQueue.flush();  // avoid buffering
    +                    } catch (InterruptedException e) {
    +                        LOG.warn("Thread interrupted when publishing metrics. Setting interrupt flag.");
    +                        Thread.currentThread().interrupt();
    +                        return;
    +                    }
                     }
    -            });
    -        }
    -    }
    -
    -    public void sendUnanchored(Task task, String stream, List<Object> values, ExecutorTransfer transfer) {
    -        Tuple tuple = task.getTuple(stream, values);
    -        List<Integer> tasks = task.getOutgoingTasks(stream, values);
    -        for (Integer t : tasks) {
    -            transfer.transfer(t, tuple);
    -        }
    -    }
    -
    -    /**
    -     * Send sampled data to the eventlogger if the global or component level debug flag is set (via nimbus api).
    -     */
    -    public void sendToEventLogger(Executor executor, Task taskData, List values,
    -                                  String componentId, Object messageId, Random random) {
    -        Map<String, DebugOptions> componentDebug = executor.getStormComponentDebug().get();
    -        DebugOptions debugOptions = componentDebug.get(componentId);
    -        if (debugOptions == null) {
    -            debugOptions = componentDebug.get(executor.getStormId());
    -        }
    -        double spct = ((debugOptions != null) && (debugOptions.is_enable())) ? debugOptions.get_samplingpct() : 0;
    -        if (spct > 0 && (random.nextDouble() * 100) < spct) {
    -            sendUnanchored(taskData, StormCommon.EVENTLOGGER_STREAM_ID,
    -                    new Values(componentId, messageId, System.currentTimeMillis(), values),
    -                    executor.getExecutorTransfer());
    +            );
             }
         }
     
    -    public void reflectNewLoadMapping(LoadMapping loadMapping) {
    -        for (LoadAwareCustomStreamGrouping g : groupers) {
    -            g.refreshLoad(loadMapping);
    -        }
    -    }
    -
    -    private void registerBackpressure() {
    -        receiveQueue.registerBackpressureCallback(new DisruptorBackpressureCallback() {
    -            @Override
    -            public void highWaterMark() throws Exception {
    -                LOG.debug("executor " + executorId + " is congested, set backpressure flag true");
    -                WorkerBackpressureThread.notifyBackpressureChecker(workerData.getBackpressureTrigger());
    -            }
    -
    -            @Override
    -            public void lowWaterMark() throws Exception {
    -                LOG.debug("executor " + executorId + " is not-congested, set backpressure flag false");
    -                WorkerBackpressureThread.notifyBackpressureChecker(workerData.getBackpressureTrigger());
    -            }
    -        });
    -        receiveQueue.setHighWaterMark(ObjectReader.getDouble(topoConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
    -        receiveQueue.setLowWaterMark(ObjectReader.getDouble(topoConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
    -        receiveQueue.setEnableBackpressure(ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false));
    -    }
    -
         protected void setupTicks(boolean isSpout) {
             final Integer tickTimeSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS), null);
    -        boolean enableMessageTimeout = (Boolean) topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
             if (tickTimeSecs != null) {
    +            boolean enableMessageTimeout = (Boolean) topoConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
                 if ((!Acker.ACKER_COMPONENT_ID.equals(componentId) && Utils.isSystemId(componentId))
                     || (!enableMessageTimeout && isSpout)) {
    -                LOG.info("Timeouts disabled for executor {}:{}", componentId, executorId);
    +                LOG.info("Timeouts disabled for executor " + componentId + ":" + executorId);
    --- End diff --
    
    nit: why did we go back to String concatenation?


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    I want to clarify that my performance tests on the latest code were fairly simple.  I did more exhaustive performance tests on an older version of the code that looked good to me, but I didn't save any of the numbers.
    
    I am planning to merge this in shortly once I verify all of the unit tests still pass, but I think we should see if we can reproduce the increased CPU utilization that @HeartSaVioR saw in a follow on JIRA.


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    I suspect this issue won’t show up on the setups I am using and will need to be triaged on your setup itself. Will work with you offline on that. 
    
    BTW what version of Java are you using ? 


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r161356532
  
    --- Diff: docs/Performance.md ---
    @@ -0,0 +1,132 @@
    +---
    --- End diff --
    
    I am putting a link to this doc from Concepts.md. If you have better place in mind please let me know.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r161358493
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -155,134 +150,159 @@ public void start() throws Exception {
     
             Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
                 @Override public Object run() throws Exception {
    -                workerState =
    -                    new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
    +                return loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials);
    +            }
    +        }); // Subject.doAs(...)
    +
    +    }
    +
    +    private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState, Map<String, String> initCreds, Credentials initialCredentials)
    +            throws Exception {
    +        workerState =
    +                new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
                             stormClusterState);
     
    -                // Heartbeat here so that worker process dies if this fails
    -                // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
    -                // that worker is running and moves on
    -                doHeartBeat();
    +        // Heartbeat here so that worker process dies if this fails
    +        // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
    +        // that worker is running and moves on
    +        doHeartBeat();
     
    -                executorsAtom = new AtomicReference<>(null);
    +        executorsAtom = new AtomicReference<>(null);
     
    -                // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
    -                // to the supervisor
    -                workerState.heartbeatTimer
    -                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
    -                        try {
    -                            doHeartBeat();
    -                        } catch (IOException e) {
    -                            throw new RuntimeException(e);
    -                        }
    -                    });
    +        // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
    +        // to the supervisor
    +        workerState.heartbeatTimer
    +                .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
    +                    try {
    +                        doHeartBeat();
    +                    } catch (IOException e) {
    +                        throw new RuntimeException(e);
    +                    }
    +                });
     
    -                workerState.executorHeartbeatTimer
    -                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
    +        workerState.executorHeartbeatTimer
    +                .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
                             Worker.this::doExecutorHeartbeats);
     
    -                workerState.registerCallbacks();
    +        workerState.registerCallbacks();
     
    -                workerState.refreshConnections(null);
    +        workerState.refreshConnections(null);
     
    -                workerState.activateWorkerWhenAllConnectionsReady();
    +        workerState.activateWorkerWhenAllConnectionsReady();
     
    -                workerState.refreshStormActive(null);
    +        workerState.refreshStormActive(null);
     
    -                workerState.runWorkerStartHooks();
    +        workerState.runWorkerStartHooks();
     
    -                List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
    -                for (List<Long> e : workerState.getExecutors()) {
    -                    if (ConfigUtils.isLocalMode(topologyConf)) {
    -                        newExecutors.add(
    -                            LocalExecutor.mkExecutor(workerState, e, initCreds)
    -                                .execute());
    -                    } else {
    -                        newExecutors.add(
    -                            Executor.mkExecutor(workerState, e, initCreds)
    -                                .execute());
    -                    }
    -                }
    -                executorsAtom.set(newExecutors);
    +        List<Executor> execs = new ArrayList<>();
    +        for (List<Long> e : workerState.getExecutors()) {
    +            if (ConfigUtils.isLocalMode(topologyConf)) {
    +                Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
    +                execs.add( executor );
    +                workerState.localReceiveQueues.put(executor.getTaskIds().get(0), executor.getReceiveQueue());
    +            } else {
    +                Executor executor = Executor.mkExecutor(workerState, e, initCreds);
    +                workerState.localReceiveQueues.put(executor.getTaskIds().get(0), executor.getReceiveQueue());
    +                execs.add(executor);
    +            }
    +        }
     
    -                EventHandler<Object> tupleHandler = (packets, seqId, batchEnd) -> workerState
    -                    .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>) packets, seqId, batchEnd);
    +        List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
    +        for (Executor executor : execs) {
    +            newExecutors.add(executor.execute());
    +        }
    +        executorsAtom.set(newExecutors);
     
    -                // This thread will publish the messages destined for remote tasks to remote connections
    -                transferThread = Utils.asyncLoop(() -> {
    -                    workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
    -                    return 0L;
    -                });
    +        // This thread will send out messages destined for remote tasks (on other workers)
    +        transferThread = workerState.makeTransferThread();
    +        transferThread.setName("Worker-Transfer");
     
    -                DisruptorBackpressureCallback disruptorBackpressureHandler =
    -                    mkDisruptorBackpressureHandler(workerState);
    -                workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
    -                workerState.transferQueue
    -                    .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
    -                workerState.transferQueue
    -                    .setHighWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
    -                workerState.transferQueue
    -                    .setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
    -
    -                WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler();
    -                backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
    -                if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    -                    backpressureThread.start();
    -                    stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle);
    -                    
    -                    int pollingSecs = ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
    -                    workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, workerState::refreshThrottle);
    -                }
    +        credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
     
    -                credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
    +        establishLogSettingCallback();
     
    -                establishLogSettingCallback();
    +        workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
     
    -                workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
    +        workerState.refreshCredentialsTimer.scheduleRecurring(0,
    +                (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
    +                    @Override public void run() {
    +                        checkCredentialsChanged();
    +                    }
    +                });
     
    -                workerState.refreshCredentialsTimer.scheduleRecurring(0,
    -                    (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
    -                        @Override public void run() {
    -                            checkCredentialsChanged();
    -                            if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    -                               checkThrottleChanged();
    -                            }
    +        workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
    +                (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new Runnable() {
    +                    @Override public void run() {
    +                        try {
    +                            LOG.debug("Checking if blobs have updated");
    +                            updateBlobUpdates();
    +                        } catch (IOException e) {
    +                            // IOException from reading the version files to be ignored
    +                            LOG.error(e.getStackTrace().toString());
                             }
    -                    });
    -
    -                workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
    -                        (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new Runnable() {
    -                            @Override public void run() {
    -                                try {
    -                                    LOG.debug("Checking if blobs have updated");
    -                                    updateBlobUpdates();
    -                                } catch (IOException e) {
    -                                    // IOException from reading the version files to be ignored
    -                                    LOG.error(e.getStackTrace().toString());
    -                                }
    -                            }
    -                        });
    -
    -                // The jitter allows the clients to get the data at different times, and avoids thundering herd
    -                if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
    -                    workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
    -                }
    +                    }
    +                });
    +
    +        // The jitter allows the clients to get the data at different times, and avoids thundering herd
    +        if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
    +            workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
    +        }
    +
    +        workerState.refreshConnectionsTimer.scheduleRecurring(0,
    +                (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
     
    -                workerState.refreshConnectionsTimer.scheduleRecurring(0,
    -                    (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
    +        workerState.resetLogLevelsTimer.scheduleRecurring(0,
    +                (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
     
    -                workerState.resetLogLevelsTimer.scheduleRecurring(0,
    -                    (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
    +        workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
    +                workerState::refreshStormActive);
     
    -                workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
    -                    workerState::refreshStormActive);
    +        setupFlushTupleTimer(topologyConf, newExecutors);
    +        setupBackPressureCheckTimer(topologyConf);
    +
    +        LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
    +        LOG.info("Worker {} for storm {} on {}:{}  has finished loading", workerId, topologyId, assignmentId, port);
    +        return this;
    +    }
     
    -                LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
    -                LOG.info("Worker {} for storm {} on {}:{}  has finished loading", workerId, topologyId, assignmentId, port);
    -                return this;
    -            };
    +    private void setupFlushTupleTimer(final Map<String, Object> topologyConf, final List<IRunningExecutor> executors) {
    +        final Integer producerBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
    +        final Integer xferBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_TRANSFER_BATCH_SIZE));
    +        final Long flushIntervalMillis = ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BATCH_FLUSH_INTERVAL_MILLIS));
    +        if ((producerBatchSize == 1 && xferBatchSize == 1) || flushIntervalMillis == 0) {
    +            LOG.info("Flush Tuple generation disabled. producerBatchSize={}, xferBatchSize={}, flushIntervalMillis={}", producerBatchSize, xferBatchSize, flushIntervalMillis);
    +            return;
    +        }
    +
    +        workerState.flushTupleTimer.scheduleRecurringMs(flushIntervalMillis, flushIntervalMillis, new Runnable() {
    +            @Override
    +            public void run() {
    +                // send flush tuple to all executors
    +                for (int i = 0; i < executors.size(); i++) {
    +                    IRunningExecutor exec = executors.get(i);
    +                    if (exec.getExecutorId().get(0) != Constants.SYSTEM_TASK_ID) {
    +                        exec.getExecutor().publishFlushTuple();
    +                    }
    +                }
    +            }
             });
    +        LOG.info("Flush tuple will be generated every {} millis", flushIntervalMillis);
    +    }
     
    +    private void setupBackPressureCheckTimer(final Map<String, Object> topologyConf) {
    +        final Integer workerCount = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_WORKERS));
    +        if (workerCount <= 1) {
    +            LOG.info("BackPressure change checking is disabled as there is only one worker");
    +            return;
    +        }
    +        final Long bpCheckIntervalMs = ObjectReader.getLong(topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_CHECK_MILLIS));
    +        workerState.backPressureCheckTimer.scheduleRecurringMs(bpCheckIntervalMs, bpCheckIntervalMs, new Runnable() {
    --- End diff --
    
    fixed


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r168046999
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
    @@ -22,19 +22,23 @@
     import org.apache.storm.executor.TupleInfo;
     import org.apache.storm.spout.ISpout;
     import org.apache.storm.spout.ISpoutOutputCollector;
    +import org.apache.storm.tuple.AddressedTuple;
     import org.apache.storm.tuple.MessageId;
     import org.apache.storm.tuple.TupleImpl;
     import org.apache.storm.tuple.Values;
    -import org.apache.storm.utils.Utils;
     import org.apache.storm.utils.MutableLong;
     import org.apache.storm.utils.RotatingMap;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
     import java.util.ArrayList;
     import java.util.List;
     import java.util.Random;
     
    +// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally
    --- End diff --
    
    yes please thats the intent.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167287393
  
    --- Diff: storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---
    @@ -40,109 +40,117 @@
     import org.apache.storm.nimbus.NimbusInfo;
     
     public interface IStormClusterState {
    -    public List<String> assignments(Runnable callback);
    +    List<String> assignments(Runnable callback);
     
    -    public Assignment assignmentInfo(String stormId, Runnable callback);
    +    Assignment assignmentInfo(String stormId, Runnable callback);
     
    -    public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
    +    VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
     
    -    public Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
    +    Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
     
    -    public List<String> blobstoreInfo(String blobKey);
    +    List<String> blobstoreInfo(String blobKey);
     
    -    public List<NimbusSummary> nimbuses();
    +    List<NimbusSummary> nimbuses();
     
    -    public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
    +    void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
     
    -    public List<String> activeStorms();
    +    List<String> activeStorms();
     
         /**
          * Get a storm base for a topology
          * @param stormId the id of the topology
          * @param callback something to call if the data changes (best effort)
          * @return the StormBase or null if it is not alive.
          */
    -    public StormBase stormBase(String stormId, Runnable callback);
    +    StormBase stormBase(String stormId, Runnable callback);
     
    -    public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
    +    ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
     
    -    public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
    +    List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
     
    -    public List<ProfileRequest> getTopologyProfileRequests(String stormId);
    +    List<ProfileRequest> getTopologyProfileRequests(String stormId);
     
    -    public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);
    +    void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);
     
    -    public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
    +    void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
     
    -    public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
    +    Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
     
    -    public List<String> supervisors(Runnable callback);
    +    List<String> supervisors(Runnable callback);
     
    -    public SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
    +    SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
     
    -    public void setupHeatbeats(String stormId);
    +    void setupHeatbeats(String stormId);
     
    -    public void teardownHeartbeats(String stormId);
    +    void teardownHeartbeats(String stormId);
     
    -    public void teardownTopologyErrors(String stormId);
    +    void teardownTopologyErrors(String stormId);
     
    -    public List<String> heartbeatStorms();
    +    List<String> heartbeatStorms();
     
    -    public List<String> errorTopologies();
    +    List<String> errorTopologies();
     
    -    public List<String> backpressureTopologies();
    +    /** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon. */
    --- End diff --
    
    Could you file a JIRA for us to remove it in 3.x?


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159954450
  
    --- Diff: storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java ---
    @@ -47,7 +47,6 @@
         public static final String STORMS_ROOT = "storms";
         public static final String SUPERVISORS_ROOT = "supervisors";
         public static final String WORKERBEATS_ROOT = "workerbeats";
    -    public static final String BACKPRESSURE_ROOT = "backpressure";
    --- End diff --
    
    In order to support running older topology versions under a newer 2.x nimbus/etc we should still keep around the basic setup and cleanup of the backpressure nodes in zookeeper at least until a 3.x release.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r161357014
  
    --- Diff: storm-client/src/jvm/org/apache/storm/StormTimer.java ---
    @@ -193,6 +210,24 @@ public void run() {
             });
         }
     
    +    /**
    +     * Schedule a function to run recurrently
    +     * @param delayMs the number of millis to delay before running the function
    +     * @param recurMs the time between each invocation
    +     * @param func the function to run
    +     */
    +    public void scheduleRecurringMs(long delayMs, final long recurMs, final Runnable func) {
    +        scheduleMs(delayMs, new Runnable() {
    --- End diff --
    
    Doesn't seem feasible due to the 'this' reference inside the lambda.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r168026607
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License
    + */
    +
    +package org.apache.storm.messaging.netty;
    +
    +import org.apache.storm.serialization.KryoValuesDeserializer;
    +import org.apache.storm.serialization.KryoValuesSerializer;
    +import org.jboss.netty.buffer.ChannelBuffer;
    +import org.jboss.netty.buffer.ChannelBuffers;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
    +public class BackPressureStatus {
    +    public static final short IDENTIFIER = (short)-600;
    +    private static final int SIZE_OF_ID = 2; // size if IDENTIFIER
    +    private static final int SIZE_OF_INT = 4;
    +
    +    private static AtomicLong bpCount = new AtomicLong(0);
    +
    +    public String workerId;
    +    public final long id;                       // monotonically increasing id
    --- End diff --
    
    Its only for debugging purposes.. so that we can co-relate sent & recvd msgs. I have used it to measure latency involved in transmission of BackPressureStatus msgs. 


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167387609
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License
    + */
    +
    +package org.apache.storm.messaging.netty;
    +
    +import org.apache.storm.serialization.KryoValuesDeserializer;
    +import org.apache.storm.serialization.KryoValuesSerializer;
    +import org.jboss.netty.buffer.ChannelBuffer;
    +import org.jboss.netty.buffer.ChannelBuffers;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
    +public class BackPressureStatus implements java.io.Serializable {
    --- End diff --
    
    https://github.com/apache/storm/blob/aaebc3b237916340156ac3b8dc956d6c62c34983/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java#L66-L74


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    Here's my test result: TVL with rate 85000 and max spout pending 5000. spouts/splitters/counters are set to same as worker count.
    
    Again, this is just to see if there's clear performance regression from TVL which we have been using. Full analysis like [google doc in STORM-2306](https://docs.google.com/document/d/1A5k41UjVFY8jZg01BHc1fFxmI0AcFz5jRiKhNjhOj7I/edit?usp=sharing) would require so much efforts and resources (dedicated machines). If we could update the numbers with latest master vs latest patch of STORM-2306 it would be really great, but we should be OK if we postpone measuring numbers just before releasing Storm 2.0.0 comparing with latest Storm 1.x release.
    
    > 4 workers
    
    >> master (ab7b4ca)
    
    1.
    
    ```
    (s)  rate(tuple/s)       mean(ms)     99%ile(ms)   99.9%ile(ms)          cores        mem(MB)         failed
      0     56,660.500      1,482.288      3,808.428      3,942.646          6.554        329.044              0
     30     82,245.581         10.817         27.607         50.430          5.093        364.626              0
     61     85,022.500         10.495         21.316         36.405          5.192        242.701              0
     91     84,994.733         10.365         19.186         29.688          5.152        399.110              0
    121     85,008.333         10.385         19.595         29.016          5.195        269.351              0
    151     84,999.867         10.367         19.005         29.000          5.156        361.585              0
    181     85,006.133         10.361         18.285         24.953          5.215        319.568              0
    211     85,007.433         10.373         18.547         26.018          5.227        374.250              0
    241     84,978.300         10.390         19.153         29.852          5.107        240.743              0
    271     85,025.733         10.351         18.366         28.934          5.134        308.930              0
    ```
    
    2.
    
    ```
    (s)  rate(tuple/s)       mean(ms)     99%ile(ms)   99.9%ile(ms)          cores        mem(MB)         failed
      0     24,746.933      2,497.621      3,498.050      3,571.450          4.175        480.104              0
     30     88,589.100        160.265      1,622.147      1,778.385          5.883        329.437              0
     60     85,012.833         10.494         20.120         31.326          5.211        421.895              0
     90     84,993.867         10.394         18.956         26.132          5.185        434.169              0
    120     85,008.100         10.474         20.201         31.523          5.194        440.354              0
    150     85,001.067         10.396         18.776         28.574          5.238        449.158              0
    180     85,005.633         10.477         20.283         35.127          5.289        441.979              0
    210     84,994.833         10.390         18.678         29.295          5.136        419.486              0
    240     85,011.167         10.370         18.219         26.051          5.219        308.977              0
    270     85,007.600         10.438         19.317         27.804          5.158        403.509              0
    ```
    
    3.
    
    ```
    (s)  rate(tuple/s)       mean(ms)     99%ile(ms)   99.9%ile(ms)          cores        mem(MB)         failed
      0     56,648.833      1,368.022      3,565.158      3,682.599          6.476        405.396              0
     30     85,045.067         10.725         23.446         44.007          5.249        448.851              0
     60     85,002.400         10.527         19.874         28.901          5.209        452.087              0
     90     84,997.800         10.412         19.694         34.341          5.136        356.300              0
    120     84,995.033         10.451         20.267         29.802          5.174        379.923              0
    150     85,019.533         10.413         19.153         30.556          5.193        398.516              0
    180     85,001.367         10.371         18.448         25.182          5.191        331.490              0
    210     85,004.400         10.386         18.874         24.101          5.195        368.889              0
    240     84,991.367         10.368         18.416         25.018          5.132        392.701              0
    270     85,004.367         10.469         19.808         35.586          5.147        443.393              0
    ```
    
    >> STORM-2306 (5b54809)
    
    1.
    
    ```
    (s)  rate(tuple/s)       mean(ms)     99%ile(ms)   99.9%ile(ms)          cores        mem(MB)         failed
      0     28,098.000      1,549.653      3,183.477      3,227.517          4.041        192.208              0
     30     85,306.133         13.650        270.795        406.585          8.958        275.171              0
     60     85,007.067          5.466         17.449         28.836          9.023        287.177              0
     90     92,084.333          5.483         17.269         26.280          9.004        212.465              0
    120     84,999.400          5.561         17.662         29.458          9.739        368.272              0
    150     85,005.367          5.593         17.449         37.126          8.979        293.334              0
    180     92,082.733          5.584         18.219         28.787          8.981        334.805              0
    210     85,008.000          5.571         18.022         29.213          8.962        363.184              0
    240     84,992.867          5.604         17.662         39.191          9.020        271.483              0
    270     92,094.467          5.590         17.351         24.953          8.939        320.613              0
    ```
    
    2.
    
    ```
    (s)  rate(tuple/s)       mean(ms)     99%ile(ms)   99.9%ile(ms)          cores        mem(MB)         failed
      0     56,779.467        728.445      2,967.470      3,066.036          7.032        308.975              0
     30     84,986.933          5.851         21.037         33.489          9.101        404.124              0
     60     85,021.833          5.593         18.301         29.049          9.104        377.985              0
     90     85,000.500          5.577         17.465         26.853          9.101        343.799              0
    120     85,009.300          5.714         18.661         29.196          9.084        252.083              0
    150     84,998.533          5.532         16.957         26.460          9.114        252.234              0
    180     85,005.300          5.588         16.810         26.165          9.102        227.976              0
    210     84,997.633          5.708         19.055         32.096          9.157        294.213              0
    240     84,994.433          5.692         18.481         30.261          9.046        365.903              0
    270     85,015.500          5.590         18.022         28.197          9.080        401.871              0
    ```
    
    3.
    
    ```
    (s)  rate(tuple/s)       mean(ms)     99%ile(ms)   99.9%ile(ms)          cores        mem(MB)         failed
      0     34,802.167      1,451.699      3,227.517      3,275.751          4.887        358.051              0
     30     99,803.667         26.173        740.295        880.804          9.226        349.733              0
     60     92,079.433          5.790         20.398         34.800         11.521        254.316              0
     90     85,002.100          5.590         17.220         27.361          9.218        333.207              0
    120     84,998.033          5.750         18.432         28.066          9.179        325.581              0
    150     84,993.500          5.745         18.792         29.966          9.190        323.914              0
    180     85,011.500          5.828         17.269         25.444          9.217        352.495              0
    210     85,005.200          5.761         18.514         32.899          9.244        338.353              0
    240     84,983.900          5.756         20.546         33.374          9.165        346.788              0
    270     85,018.133          5.516         17.138         32.915          9.206        404.615              0
    ```
    
    > 1 worker
    
    >> master (ab7b4ca)
    
    1.
    
    ```
    (s)  rate(tuple/s)       mean(ms)     99%ile(ms)   99.9%ile(ms)          cores        mem(MB)         failed
      0     56,667.633         96.401        944.767        972.030          2.220        115.121              0
     30     85,012.833          6.684         21.283         27.427          2.616         27.433              0
     60     85,003.867          6.715         22.315         28.967          2.600         48.685              0
     90     85,009.633          7.003         24.855         30.261          2.650         56.965              0
    120     85,008.167          6.765         22.331         35.750          2.651         75.872              0
    150     84,997.767          6.788         22.430         31.998          2.597         99.166              0
    180     85,004.633          6.658         21.905         26.755          2.574        123.478              0
    210     85,011.633          6.661         21.070         26.460          2.573         34.217              0
    240     84,961.500          6.658         21.955         29.606          2.593         38.743              0
    270     85,062.533          6.715         22.790         28.492          2.597         76.703              0
    ```
    
    2.
    
    ```
    (s)  rate(tuple/s)       mean(ms)     99%ile(ms)   99.9%ile(ms)          cores        mem(MB)         failed
      0     56,666.067        100.076        956.301        968.884          2.289         90.811              0
     30     84,985.100          6.886         24.461         30.654          2.644         43.144              0
     60     85,033.667          6.738         21.725         30.786          2.613         44.536              0
     90     85,006.833          6.827         24.183         28.361          2.636        123.622              0
    120     85,003.100          6.767         21.807         27.967          2.700        111.695              0
    150     85,007.800          6.564         21.365         33.194          2.574        104.018              0
    180     85,004.067          7.089         24.527         29.688          2.669         80.106              0
    210     85,009.400          6.673         21.348         27.853          2.617         53.960              0
    240     84,999.367          7.159         24.216         29.131          2.668        134.561              0
    270     85,064.100          7.168         24.773         28.393          2.670        115.301              0
    ```
    
    3.
    
    ```
    (s)  rate(tuple/s)       mean(ms)     99%ile(ms)   99.9%ile(ms)          cores        mem(MB)         failed
      0     56,651.067        104.108        983.040        999.293          2.245         94.063              0
     30     82,287.774          7.086         23.364         43.221          2.574        106.583              0
     61     85,002.000          6.953         23.396         29.213          2.642        113.673              0
     91     85,013.500          6.913         22.839         29.393          2.628        129.253              0
    121     85,008.667          6.775         22.348         31.982          2.650        153.746              0
    151     85,003.467          6.576         19.956         27.017          2.560         56.378              0
    181     85,015.300          6.837         22.692         31.408          2.607         79.616              0
    211     85,004.100          6.879         25.346         31.097          2.581         94.896              0
    241     85,005.867          6.891         23.609         29.278          2.594        116.510              0
    271     85,010.800          6.760         21.987         28.541          2.575        125.168              0
    ```
    
    >> STORM-2306 (5b54809)
    
    1.
    
    ```
    (s)  rate(tuple/s)       mean(ms)     99%ile(ms)   99.9%ile(ms)          cores        mem(MB)         failed
      0     56,666.333         18.022        330.564        346.030          2.079         46.405              0
     30     85,017.400          3.748         20.791         24.805          2.686        107.791              0
     60     85,020.833          3.466         20.120         24.068          2.679         46.481              0
     90     85,009.833          3.470         19.988         23.118          2.639         63.486              0
    120     85,007.600          3.676         21.250         26.542          2.708         76.022              0
    150     85,006.733          3.395         20.283         23.904          2.636         80.490              0
    180     85,009.333          3.632         21.086         26.231          2.657         78.360              0
    210     85,003.300          3.643         20.840         26.362          2.670         87.437              0
    240     85,007.733          3.735         21.545         25.788          2.686         98.075              0
    270     85,008.000          3.675         21.430         29.606          2.647         93.264              0
    ```
    
    2.
    
    ```
    (s)  rate(tuple/s)       mean(ms)     99%ile(ms)   99.9%ile(ms)          cores        mem(MB)         failed
      0     56,684.100         15.841        314.835        371.196          2.051         52.854              0
     30     84,998.900          3.343         20.464         24.134          2.624         95.248              0
     60     85,014.100          3.600         21.299         24.363          2.665         43.992              0
     90     85,000.533          3.539         20.726         23.839          2.650        100.354              0
    120     85,006.567          3.483         20.251         23.347          2.665         40.798              0
    150     84,992.467          3.562         20.791         26.001          2.666        102.680              0
    180     82,297.839          3.655         20.890         27.378          2.590         51.539              0
    211     85,007.333          3.567         21.299         26.640          2.650        107.193              0
    241     85,012.467          3.604         20.955         27.460          2.646         50.419              0
    271     85,001.333          3.288         19.726         24.543          2.604        103.526              0
    ```
    
    3.
    
    ```
    (s)  rate(tuple/s)       mean(ms)     99%ile(ms)   99.9%ile(ms)          cores        mem(MB)         failed
      0     56,680.200         14.695        299.893        340.263          2.096        100.239              0
     30     85,012.833          4.007         21.512         27.427          2.719         67.345              0
     60     85,003.900          3.921         22.233         31.228          2.709         43.670              0
     90     85,009.467          3.881         22.249         26.722          2.683         23.378              0
    120     85,027.600          3.955         21.840         25.772          2.717        108.810              0
    150     85,009.300          4.049         22.479         36.405          2.724         80.150              0
    180     85,005.933          3.381         20.283         23.478          2.639         43.224              0
    210     85,007.733          3.798         21.053         24.199          2.717        115.814              0
    240     85,007.800          4.275         22.266         26.608          2.751        106.709              0
    270     85,005.867          3.507         20.644         24.297          2.700         59.038              0
    ```
    
    In overall, this patch shows half of mean latency, whereas 99%ile/99.9%ile of latencies are close to. It also shows this patch consumes more CPU. The gap is fairly small for single-worker but becomes somewhat huge on 4 workers. I might be wrong about testing so it would be really nice if someone also runs the multi-workers test and shares the result.
    
    Even my test result is not wrong I'm +1, because this patch introduces better design of backpressure which should have been enabled by default but we disabled. We could file some follow-up issues if there're something we should address.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r161358991
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
    @@ -225,51 +228,62 @@ private static String getExecutorType(WorkerTopologyContext workerTopologyContex
             }
         }
     
    +    public Queue<AddressedTuple> getPendingEmits() {
    +        return pendingEmits;
    +    }
    +
         /**
          * separated from mkExecutor in order to replace executor transfer in executor data for testing
          */
         public ExecutorShutdown execute() throws Exception {
             LOG.info("Loading executor tasks " + componentId + ":" + executorId);
     
    -        registerBackpressure();
    -        Utils.SmartThread systemThreads =
    -                Utils.asyncLoop(executorTransfer, executorTransfer.getName(), reportErrorDie);
    -
             String handlerName = componentId + "-executor" + executorId;
    -        Utils.SmartThread handlers =
    +        Utils.SmartThread handler =
                     Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName);
             setupTicks(StatsUtil.SPOUT.equals(type));
     
             LOG.info("Finished loading executor " + componentId + ":" + executorId);
    -        return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask);
    +        return new ExecutorShutdown(this, Lists.newArrayList(handler), idToTask);
         }
     
         public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception;
     
    -    @SuppressWarnings("unchecked")
         @Override
    -    public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception {
    -        ArrayList<AddressedTuple> addressedTuples = (ArrayList<AddressedTuple>) event;
    -        for (AddressedTuple addressedTuple : addressedTuples) {
    -            TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
    -            int taskId = addressedTuple.getDest();
    -            if (isDebug) {
    -                LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
    -            }
    +    public void accept(Object event) {
    +        if (event == JCQueue.INTERRUPT) {
    +            throw new RuntimeException(new InterruptedException("JCQ processing interrupted") );
    +        }
    +        AddressedTuple addressedTuple =  (AddressedTuple)event;
    +        int taskId = addressedTuple.getDest();
    +
    +        TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
    +        if (isDebug) {
    +            LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
    +        }
    +
    +        try {
                 if (taskId != AddressedTuple.BROADCAST_DEST) {
                     tupleActionFn(taskId, tuple);
                 } else {
                     for (Integer t : taskIds) {
                         tupleActionFn(t, tuple);
                     }
                 }
    +        } catch (Exception e) {
    +            throw new RuntimeException(e);
    --- End diff --
    
    [Utils.asyncLoop()](https://github.com/roshannaik/storm/blob/STORM-2306-2/storm-client/src/jvm/org/apache/storm/utils/Utils.java#L351)  of the  BoltExecutor or SpoutExecutor thread


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r168034925
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
    @@ -22,19 +22,23 @@
     import org.apache.storm.executor.TupleInfo;
     import org.apache.storm.spout.ISpout;
     import org.apache.storm.spout.ISpoutOutputCollector;
    +import org.apache.storm.tuple.AddressedTuple;
     import org.apache.storm.tuple.MessageId;
     import org.apache.storm.tuple.TupleImpl;
     import org.apache.storm.tuple.Values;
    -import org.apache.storm.utils.Utils;
     import org.apache.storm.utils.MutableLong;
     import org.apache.storm.utils.RotatingMap;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
     import java.util.ArrayList;
     import java.util.List;
     import java.util.Random;
     
    +// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally
    --- End diff --
    
    I thought STORM-2945 was filed to find the way to support background emit without external synchronization, so likely having the chance to keep unresolved in 2.x. If you intended to document how to enable background emit with current state in STORM-2945, please ignore the comment here.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r161358833
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
    @@ -225,51 +228,62 @@ private static String getExecutorType(WorkerTopologyContext workerTopologyContex
             }
         }
     
    +    public Queue<AddressedTuple> getPendingEmits() {
    +        return pendingEmits;
    +    }
    +
         /**
          * separated from mkExecutor in order to replace executor transfer in executor data for testing
          */
         public ExecutorShutdown execute() throws Exception {
             LOG.info("Loading executor tasks " + componentId + ":" + executorId);
     
    -        registerBackpressure();
    -        Utils.SmartThread systemThreads =
    -                Utils.asyncLoop(executorTransfer, executorTransfer.getName(), reportErrorDie);
    -
             String handlerName = componentId + "-executor" + executorId;
    -        Utils.SmartThread handlers =
    +        Utils.SmartThread handler =
                     Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName);
             setupTicks(StatsUtil.SPOUT.equals(type));
     
             LOG.info("Finished loading executor " + componentId + ":" + executorId);
    -        return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask);
    +        return new ExecutorShutdown(this, Lists.newArrayList(handler), idToTask);
         }
     
         public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception;
     
    -    @SuppressWarnings("unchecked")
         @Override
    -    public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception {
    -        ArrayList<AddressedTuple> addressedTuples = (ArrayList<AddressedTuple>) event;
    -        for (AddressedTuple addressedTuple : addressedTuples) {
    -            TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
    -            int taskId = addressedTuple.getDest();
    -            if (isDebug) {
    -                LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
    -            }
    +    public void accept(Object event) {
    +        if (event == JCQueue.INTERRUPT) {
    +            throw new RuntimeException(new InterruptedException("JCQ processing interrupted") );
    --- End diff --
    
    That exception bubbles up to and gets caught by the Utils.asyncLoop in the thread that calls the recvQueue.consume() [i.e. a BoltExecutor or SpoutExecutor thread]


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r160065976
  
    --- Diff: storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java ---
    @@ -24,50 +24,46 @@
     import org.apache.storm.task.GeneralTopologyContext;
     
     public class TupleImpl implements Tuple {
    -    private final List<Object> values;
    -    private final int taskId;
    -    private final String streamId;
    -    private final GeneralTopologyContext context;
    -    private final MessageId id;
    +    private List<Object> values;
    +    private int taskId;
    +    private String streamId;
    +    private GeneralTopologyContext context;
    +    private MessageId id;
    +    private final String srcComponent;
         private Long _processSampleStartTime;
         private Long _executeSampleStartTime;
         private long _outAckVal = 0;
    -    
    +
         public TupleImpl(Tuple t) {
             this.values = t.getValues();
             this.taskId = t.getSourceTask();
             this.streamId = t.getSourceStreamId();
             this.id = t.getMessageId();
             this.context = t.getContext();
    -        if (t instanceof TupleImpl) {
    +        this.srcComponent = t.getSourceComponent();
    +        try {
                 TupleImpl ti = (TupleImpl) t;
                 this._processSampleStartTime = ti._processSampleStartTime;
                 this._executeSampleStartTime = ti._executeSampleStartTime;
                 this._outAckVal = ti._outAckVal;
    +        } catch (ClassCastException e) {
    +            // ignore ... if t is not a TupleImpl type .. faster than checking and then casting
             }
         }
     
    -    public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
    +    public TupleImpl(GeneralTopologyContext context, List<Object> values, String srcComponent, int taskId, String streamId, MessageId id) {
             this.values = Collections.unmodifiableList(values);
             this.taskId = taskId;
             this.streamId = streamId;
             this.id = id;
             this.context = context;
    -        
    -        String componentId = context.getComponentId(taskId);
    -        Fields schema = context.getComponentOutputFields(componentId, streamId);
    -        if(values.size()!=schema.size()) {
    -            throw new IllegalArgumentException(
    -                    "Tuple created with wrong number of fields. " +
    -                    "Expected " + schema.size() + " fields but got " +
    -                    values.size() + " fields");
    -        }
    --- End diff --
    
    I think having a map associating pair of taskId, streamId to schema's size may help reducing the slowdown. I know we are going to avoid map lookup, but it looks like acceptable tradeoff to me if we still want to leave the sanity check.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r163967849
  
    --- Diff: storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java ---
    @@ -47,7 +47,6 @@
         public static final String STORMS_ROOT = "storms";
         public static final String SUPERVISORS_ROOT = "supervisors";
         public static final String WORKERBEATS_ROOT = "workerbeats";
    -    public static final String BACKPRESSURE_ROOT = "backpressure";
    --- End diff --
    
    @revans2  can u you clarify ? 2.x will require topo jars to be rebuilt for it.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159956952
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java ---
    @@ -0,0 +1,81 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License
    + */
    +
    +package org.apache.storm.daemon.worker;
    +
    +import org.apache.storm.messaging.netty.BackPressureStatus;
    +import org.apache.storm.utils.JCQueue;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import static org.apache.storm.Constants.SYSTEM_TASK_ID;
    +
    +public class BackPressureTracker {
    +    static final Logger LOG = LoggerFactory.getLogger(BackPressureTracker.class);
    +
    +    private final Map<Integer, JCQueue> bpTasks = new ConcurrentHashMap<>(); // updates are more frequent than iteration
    +    private final Set<Integer> nonBpTasks = ConcurrentHashMap.newKeySet();
    +    private final String workerId;
    +
    +    public BackPressureTracker(String workerId, List<Integer> allLocalTasks) {
    +        this.workerId = workerId;
    +        this.nonBpTasks.addAll(allLocalTasks);    // all tasks are considered to be not under BP initially
    +        this.nonBpTasks.remove((int)SYSTEM_TASK_ID);   // not tracking system task
    +    }
    +
    +    /* called by transferLocalBatch() on NettyWorker thread
    +     * returns true if an update was recorded, false if taskId is already under BP
    +     */
    +    public boolean recordBackpressure(Integer taskId, JCQueue recvQ) {
    +        if (nonBpTasks.remove(taskId)) {
    +            bpTasks.put(taskId, recvQ);
    +            return true;
    +        }
    +        return false;
    +    }
    +
    +    // returns true if there was a change in the BP situation
    +    public boolean refreshBpTaskList() {
    +        boolean changed = false;
    +        LOG.debug("Running Back Pressure status change check");
    +        for (Iterator<Entry<Integer, JCQueue>> itr = bpTasks.entrySet().iterator(); itr.hasNext(); ) {
    +            Entry<Integer, JCQueue> entry = itr.next();
    +            if (entry.getValue().isEmptyOverflow()) {
    +                // move task from bpTasks to noBpTasks
    +                nonBpTasks.add(entry.getKey());
    +                itr.remove();
    --- End diff --
    
    It looks like there is a race condition here with `recordBackpressure`.  If `nonBpTasks.add(entry.getKey())` finishes and then we get a context switch and `recordBackpressure` completes fully for the same taskId then itr.remove happens we might have an inconsistent state.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167097905
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java ---
    @@ -17,72 +17,124 @@
      */
     package org.apache.storm.executor;
     
    -import com.google.common.annotations.VisibleForTesting;
    -import com.lmax.disruptor.EventHandler;
     import org.apache.storm.Config;
     import org.apache.storm.daemon.worker.WorkerState;
     import org.apache.storm.serialization.KryoTupleSerializer;
     import org.apache.storm.tuple.AddressedTuple;
    -import org.apache.storm.tuple.Tuple;
    -import org.apache.storm.utils.DisruptorQueue;
    -import org.apache.storm.utils.MutableObject;
    +import org.apache.storm.utils.JCQueue;
     import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.Utils;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.util.ArrayList;
    +import java.util.Collections;
     import java.util.Map;
    -import java.util.concurrent.Callable;
    +import java.util.Queue;
     
    -public class ExecutorTransfer implements EventHandler, Callable {
    +// Every executor has an instance of this class
    +public class ExecutorTransfer  {
         private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class);
     
         private final WorkerState workerData;
    -    private final DisruptorQueue batchTransferQueue;
    -    private final Map<String, Object> topoConf;
         private final KryoTupleSerializer serializer;
    -    private final MutableObject cachedEmit;
         private final boolean isDebug;
    +    private final int producerBatchSz;
    +    private int remotesBatchSz = 0;
    +    private int indexingBase = 0;
    +    private ArrayList<JCQueue> localReceiveQueues; // [taskId-indexingBase] => queue : List of all recvQs local to this worker
    +    private ArrayList<JCQueue> queuesToFlush; // [taskId-indexingBase] => queue, some entries can be null. : outbound Qs for this executor instance
     
    -    public ExecutorTransfer(WorkerState workerData, DisruptorQueue batchTransferQueue, Map<String, Object> topoConf) {
    +
    +    public ExecutorTransfer(WorkerState workerData, Map<String, Object> topoConf) {
             this.workerData = workerData;
    -        this.batchTransferQueue = batchTransferQueue;
    -        this.topoConf = topoConf;
             this.serializer = new KryoTupleSerializer(topoConf, workerData.getWorkerTopologyContext());
    -        this.cachedEmit = new MutableObject(new ArrayList<>());
             this.isDebug = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);
    +        this.producerBatchSz = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
    +    }
    +
    +    // to be called after all Executor objects in the worker are created and before this object is used
    +    public void initLocalRecvQueues() {
    +        Integer minTaskId = workerData.getLocalReceiveQueues().keySet().stream().min(Integer::compareTo).get();
    +        this.localReceiveQueues = Utils.convertToArray( workerData.getLocalReceiveQueues(), minTaskId);
    +        this.indexingBase = minTaskId;
    +        this.queuesToFlush = new ArrayList<JCQueue>(Collections.nCopies(localReceiveQueues.size(), null) );
         }
     
    -    public void transfer(int task, Tuple tuple) {
    -        AddressedTuple val = new AddressedTuple(task, tuple);
    +    // adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null)
    +    public boolean tryTransfer(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits) {
             if (isDebug) {
    -            LOG.info("TRANSFERRING tuple {}", val);
    +            LOG.info("TRANSFERRING tuple {}", addressedTuple);
    +        }
    +
    +        JCQueue localQueue = getLocalQueue(addressedTuple);
    +        if (localQueue!=null) {
    +            return tryTransferLocal(addressedTuple, localQueue, pendingEmits);
    +        }  else  {
    +            if (remotesBatchSz >= producerBatchSz) {
    +                if ( !workerData.tryFlushRemotes() ) {
    +                    if (pendingEmits != null) {
    +                        pendingEmits.add(addressedTuple);
    +                    }
    +                    return false;
    +                }
    +                remotesBatchSz = 0;
    --- End diff --
    
    @roshannaik and me briefly talked about this and agree this should be the thing to fix. Clear way but higher impact is guarding whole else statement with `synchronized` or more efficient locking if any, but @roshannaik would wanted to explore the way to minimize the impact of synchronization.


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    I'm running performance test (TVL) to see there's any regression here. As @revans2 already did  performance test, I wouldn't spend the time going through too deeply (just couple of tests). I would provide +1 after things are going well.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167286681
  
    --- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
    @@ -890,30 +871,91 @@
         public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
     
         /**
    -     * Configure timeout milliseconds used for disruptor queue wait strategy. Can be used to tradeoff latency
    -     * vs. CPU usage
    +     * Selects the Bolt's Wait Strategy to use when there are no incoming msgs. Used to trade off latency vs CPU usage.
    +     */
    +    @isString
    +    public static final String TOPOLOGY_BOLT_WAIT_STRATEGY = "topology.bolt.wait.strategy";
    +
    +    /**
    +     * Configures park time for WaitStrategyPark.  If set to 0, returns immediately (i.e busy wait).
          */
    -    @isInteger
         @NotNull
    -    public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS="topology.disruptor.wait.timeout.millis";
    +    @isPositiveNumber(includeZero = true)
    +    public static final String TOPOLOGY_BOLT_WAIT_PARK_MICROSEC = "topology.bolt.wait.park.microsec";
     
         /**
    -     * The number of tuples to batch before sending to the next thread.  This number is just an initial suggestion and
    -     * the code may adjust it as your topology runs.
    +     * Configures number of iterations to spend in level 1 of WaitStrategyProgressive, before progressing to level 2
          */
    +    @NotNull
         @isInteger
         @isPositiveNumber
    +    public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL1_COUNT =  "topology.bolt.wait.progressive.level1.count";
    +
    +    /**
    +     * Configures number of iterations to spend in level 2 of WaitStrategyProgressive, before progressing to level 3
    +     */
         @NotNull
    -    public static final String TOPOLOGY_DISRUPTOR_BATCH_SIZE="topology.disruptor.batch.size";
    +    @isInteger
    +    @isPositiveNumber
    +    public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL2_COUNT =  "topology.bolt.wait.progressive.level2.count";
     
         /**
    -     * The maximum age in milliseconds a batch can be before being sent to the next thread.  This number is just an
    -     * initial suggestion and the code may adjust it as your topology runs.
    +     * Configures sleep time for WaitStrategyProgressive.
          */
    +    @NotNull
    +    @isPositiveNumber(includeZero = true)
    +    public static final String TOPOLOGY_BOLT_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS = "topology.bolt.wait.progressive.level3.sleep.millis";
    +
    +
    +    /**
    +     * A class that implements a wait strategy for an upstream component (spout/bolt) trying to write to a downstream component
    +     * whose recv queue is full
    +     *
    +     * 1. nextTuple emits no tuples
    +     * 2. The spout has hit maxSpoutPending and can't emit any more tuples
    +     */
    +    @isString
    +    public static final String TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY="topology.backpressure.wait.strategy";
    --- End diff --
    
    Here too if this is a class name it would be good to verify that it is an instance of a given class early on.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167379385
  
    --- Diff: pom.xml ---
    @@ -259,6 +259,7 @@
             <snakeyaml.version>1.11</snakeyaml.version>
             <httpclient.version>4.3.3</httpclient.version>
             <clojure.tools.cli.version>0.2.4</clojure.tools.cli.version>
    +        <jctools.version>2.0.1</jctools.version>
    --- End diff --
    
    I dont see any references to jctools in any flux related pom.  Can you point to the specific offending location ?


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    @roshannaik Yes sure. https://issues.apache.org/jira/browse/STORM-2983 This blocks my current work and I would really appreciate it if it can be solved soon. Thanks


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r161361250
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/ObjectReader.java ---
    @@ -76,6 +76,32 @@ public static Integer getInt(Object o, Integer defaultValue) {
             throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
         }
     
    +    public static Long getLong(Object o) {
    +        return getLong(o, null);
    +    }
    +
    +    public static Long getLong(Object o, Long defaultValue) {
    +        if (null == o) {
    +            return defaultValue;
    +        }
    +
    +        if ( o instanceof Long ||
    +                o instanceof Integer ||
    +                o instanceof Short ||
    +                o instanceof Byte) {
    +            return ((Number) o).longValue();
    +        } else if (o instanceof Double) {
    +            final long l = (Long) o;
    --- End diff --
    
    yes my bad.  I think its ok to throw exception for numbers larger than Long. 
    I dont see the bug in getInt


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159969475
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -328,20 +328,22 @@ public static boolean isSystemId(String id) {
          * @return the newly created thread
          * @see Thread
          */
    -    public static SmartThread asyncLoop(final Callable afn,
    -            boolean isDaemon, final Thread.UncaughtExceptionHandler eh,
    -            int priority, final boolean isFactory, boolean startImmediately,
    -            String threadName) {
    +    public static SmartThread asyncLoop(final Callable afn, boolean isDaemon, final Thread.UncaughtExceptionHandler eh,
    +                                        int priority, final boolean isFactory, boolean startImmediately,
    +                                        String threadName) {
             SmartThread thread = new SmartThread(new Runnable() {
                 public void run() {
    -                Object s;
                     try {
    -                    Callable fn = isFactory ? (Callable) afn.call() : afn;
    -                    while ((s = fn.call()) instanceof Long) {
    -                        Time.sleepSecs((Long) s);
    +                    final Callable<Long> fn = isFactory ? (Callable<Long>) afn.call() : afn;
    +                    while (true) {
    +                        final Long s = fn.call();
    +                        if (s==null) // then stop running it
    +                            break;
    +                        if (s>0)
    +                            Thread.sleep(s);
    --- End diff --
    
    This needs to be Time.sleep if we want simulated time to work properly....


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159966670
  
    --- Diff: storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java ---
    @@ -20,6 +20,7 @@
     import org.apache.storm.Config;
     import org.apache.storm.generated.ComponentCommon;
     import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.messaging.netty.BackPressureStatus;
    --- End diff --
    
    Is this change needed?  Is this even used here?


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159969170
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/JCQueue.java ---
    @@ -0,0 +1,458 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License
    + */
    +
    +package org.apache.storm.utils;
    +
    +import org.apache.storm.policy.IWaitStrategy;
    +import org.apache.storm.metric.api.IStatefulObject;
    +import org.apache.storm.metric.internal.RateTracker;
    +import org.jctools.queues.MessagePassingQueue;
    +import org.jctools.queues.MpscArrayQueue;
    +import org.jctools.queues.MpscUnboundedArrayQueue;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +
    +public final class JCQueue implements IStatefulObject {
    +    private static final Logger LOG = LoggerFactory.getLogger(JCQueue.class);
    +
    +    public static final Object INTERRUPT = new Object();
    +
    +    private final ThroughputMeter emptyMeter = new ThroughputMeter("EmptyBatch");
    --- End diff --
    
    This is never reported anywhere...  Do we want to just delete it and move ThroughputMeter to perf?


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r160010894
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
    @@ -105,19 +134,16 @@ public void reportError(Throwable error) {
                     msgId = MessageId.makeUnanchored();
                 }
     
    -            TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, this.taskId, stream, msgId);
    -            executor.getExecutorTransfer().transfer(t, tuple);
    +            final TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), this.taskId, stream, msgId);
    +            AddressedTuple adrTuple = new AddressedTuple(t, tuple);
    +            executor.getExecutorTransfer().tryTransfer(adrTuple, executor.getPendingEmits());
             }
             if (isEventLoggers) {
    -            executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), messageId, random);
    +            taskData.sendToEventLogger(executor, values, executor.getComponentId(), messageId, random, executor.getPendingEmits());
             }
    --- End diff --
    
    @HeartSaVioR you pointed out some optimizations are possible to this .. that we can tackle in another jira ... can you elaborate or capture your thoughts into a jira ?


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167325679
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License
    + */
    +
    +package org.apache.storm.daemon.worker;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.messaging.TaskMessage;
    +import org.apache.storm.policy.IWaitStrategy;
    +import org.apache.storm.serialization.ITupleSerializer;
    +import org.apache.storm.tuple.AddressedTuple;
    +import org.apache.storm.utils.JCQueue;
    +import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.TransferDrainer;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.utils.Utils.SmartThread;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +import java.util.Queue;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +// Transfers messages destined to other workers
    +class WorkerTransfer implements JCQueue.Consumer {
    +    static final Logger LOG = LoggerFactory.getLogger(WorkerTransfer.class);
    +
    +    private final TransferDrainer drainer;
    +    private WorkerState workerState;
    +    private IWaitStrategy backPressureWaitStrategy;
    +
    +    JCQueue transferQueue; // [remoteTaskId] -> JCQueue. Some entries maybe null (if no emits to those tasksIds from this worker)
    +    AtomicBoolean[] remoteBackPressureStatus; // [[remoteTaskId] -> true/false : indicates if remote task is under BP.
    --- End diff --
    
    Same here for package private.


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    @HeartSaVioR  
    I am not seeing the reported excessive CPU usage behavior for 4 workers. First I tried taking some runs on my mid 2015 macbook pro (single cpu socket). Since no issue was seen, i reran the same on a linux server (2 sockets, 6 physical cores per socket).
    
    The below cmd was run.
    {code}
    bin/storm jar storm-loadgen-*.jar org.apache.storm.loadgen.ThroughputVsLatency --rate 85000 --spouts 4 --splitters 4 --counters 4  -c topology.max.spout.pending=5000 -c topology.workers=4
    {code}
    
    All runs were similar so just posting results from one run for each. 
    
    
    ## Mac Book Runs:
    **Master without 2306**  (#ab7b4ca)  [**Latency from UI =240ms**]
    
    start(s) | end(s) | rate(tuple/s) | mean(ms) | 99%ile(ms) | 99.9%ile(ms) | cores | mem(MB)
    -- | -- | -- | -- | -- | -- | -- | --
    0 | 30 | 19,319.467 | 8,411.912 | 13,153.337 | 13,254.001 | 5.275 | 340.719
    30 | 60 | 72,613.500 | 16,113.830 | 17,750.295 | 17,884.512 | 6.849 | 364.807
    60 | 90 | 75,519.067 | 19,671.741 | 21,474.836 | 21,558.723 | 6.755 | 344.906
    90 | 120 | 79,225.167 | 22,121.255 | 23,823.647 | 23,941.087 | 6.878 | 222.115
    120 | 150 | 80,967.567 | 23,784.484 | 25,585.254 | 25,652.363 | 6.889 | 365.383
    150 | 180 | 80,062.067 | 25,269.534 | 27,665.629 | 27,866.956 | 6.895 | 343.674
    180 | 210 | 78,306.300 | 27,560.823 | 30,467.424 | 30,551.310 | 7.025 | 436.814
    210 | 240 | 78,586.667 | 29,673.204 | 32,883.343 | 33,000.784 | 6.945 | 293.014
    240 | 270 | 80,500.667 | 31,513.715 | 34,795.946 | 34,829.500 | 6.818 | 416.121
    270 | 300 | 82,625.667 | 32,883.453 | 35,903.242 | 35,970.351 | 6.899 | 364.058
    
    
    **Master with 2306**  (#09e0123) [**Latency from UI =89ms**]
    
    start(s) | end(s) | rate(tuple/s) | mean(ms) | 99%ile(ms) | 99.9%ile(ms) | cores | mem(MB)
    -- | -- | -- | -- | -- | -- | -- | --
    0 | 30 | 19,607.100 | 8,127.475 | 14,277.411 | 14,772.339 | 4.861 | 330.486
    30 | 60 | 85,946.533 | 12,737.708 | 18,471.715 | 18,589.155 | 6.587 | 418.567
    60 | 90 | 91,256.133 | 12,276.112 | 17,901.289 | 18,001.953 | 6.527 | 229.531
    90 | 120 | 95,317.967 | 9,204.098 | 14,529.069 | 14,612.955 | 6.552 | 432.803
    120 | 150 | 97,220.233 | 5,221.476 | 9,865.003 | 10,125.050 | 6.551 | 169.757
    150 | 180 | 92,499.200 | 984.904 | 4,213.178 | 4,546.626 | 6.746 | 280.883
    180 | 210 | 79,557.700 | 1,059.939 | 2,619.343 | 2,766.143 | 6.155 | 430.853
    210 | 240 | 79,766.467 | 2,336.027 | 5,158.994 | 5,347.738 | 6.238 | 288.708
    240 | 270 | 81,595.800 | 4,284.723 | 7,377.781 | 7,528.776 | 6.258 | 315.524
    270 | 300 | 88,294.067 | 5,024.767 | 8,422.162 | 8,493.466 | 6.412 | 263.682
    
    
    ## Linux Server Runs:
    
    **Master without 2306**  (#ab7b4ca)  [**Latency from UI =15ms**]
    
    start_time(s) | end_time(s) | rate(tuple/s) | mean(ms) | 99%ile(ms) | 99.9%ile(ms) | cores | mem(MB)
    -- | -- | -- | -- | -- | -- | -- | --
    0 | 30 | 56,704.80 | 1,194.20 | 3,867.15 | 3,978.30 | 11.294 | 978.701
    30 | 60 | 85,001.70 | 12.018 | 21.479 | 26.837 | 11.931 | 897.105
    60 | 90 | 85,007.17 | 11.84 | 21.152 | 26.345 | 11.575 | 950.129
    90 | 120 | 85,008.13 | 11.781 | 20.447 | 24.986 | 11.688 | 882.768
    120 | 150 | 85,009.57 | 11.841 | 21.021 | 25.592 | 11.735 | 921.339
    150 | 180 | 84,997.60 | 11.763 | 20.791 | 25.199 | 11.476 | 961.395
    180 | 210 | 85,005.57 | 11.824 | 20.66 | 25.281 | 11.798 | 1,000.03
    210 | 240 | 85,008.00 | 11.75 | 20.611 | 25.706 | 11.354 | 1,120.33
    240 | 271 | 82,259.16 | 11.916 | 20.955 | 25.117 | 11.517 | 939.918
    271 | 301 | 85,003.57 | 11.719 | 20.398 | 24.822 | 11.322 | 980.654
    
     
    
    **Master with 2306**  (#09e0123) [**Latency from UI =7.8ms**]
    
    start_time(s) | end_time(s) | rate(tuple/s) | mean(ms) | 99%ile(ms) | 99.9%ile(ms) | cores | mem(MB)
    -- | -- | -- | -- | -- | -- | -- | --
    0 | 30 | 56,701.03 | 426.534 | 2,673.87 | 2,793.41 | 7.693 | 290.006
    30 | 60 | 85,004.40 | 3.387 | 8.294 | 14.655 | 5.89 | 231.328
    60 | 90 | 85,002.20 | 3.332 | 7.66 | 10.945 | 5.791 | 247.683
    90 | 121 | 82,259.87 | 3.33 | 7.516 | 9.839 | 5.591 | 264.324
    121 | 151 | 85,004.37 | 3.349 | 7.737 | 10.969 | 5.819 | 272.916
    151 | 181 | 85,001.07 | 3.323 | 7.434 | 9.961 | 5.747 | 203.54
    181 | 211 | 85,002.57 | 3.335 | 7.586 | 10.281 | 5.794 | 304.567
    211 | 241 | 85,005.03 | 3.326 | 7.475 | 9.921 | 5.836 | 317.026
    241 | 271 | 85,003.27 | 3.339 | 7.565 | 10.527 | 5.723 | 251.676
    271 | 301 | 85,002.00 | 3.351 | 7.799 | 12.394 | 5.716 | 377.182
    
    
    ## Summary:
    **On Linux** Both CPU and mem usage was significantly better for 2306.  
    Actual latency (taken from UI) was also much better for 2306.
    
    **On Macbook:** Both CPU & mem usage were relatively close but slightly favoring 2306. Again, the actual latency was much better for 2306.



---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    Nice feeling for me to see this merged in and I will look into the CPU usage issue reported by @HeartSaVioR. Just Wanted to specifically call out and thank the following folks who have been very helpful in making this possible:
    Sapin Amin, @harshach, @revans2 , @arunmahadevan, @HeartSaVioR and @satishd !


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167402753
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License
    + */
    +
    +package org.apache.storm.messaging.netty;
    +
    +import org.apache.storm.serialization.KryoValuesDeserializer;
    +import org.apache.storm.serialization.KryoValuesSerializer;
    +import org.jboss.netty.buffer.ChannelBuffer;
    +import org.jboss.netty.buffer.ChannelBuffers;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
    +public class BackPressureStatus implements java.io.Serializable {
    --- End diff --
    
    ControlMessage, MessageBatch and SaslMessageToken are handled explicitly by MessageDecoder and MessageEncoder using the buffer and read methods.
    When ControlMessage, SaslMessageToken, or Message Batch writes themselves out to a buffer they do not use kryo to do it.
    
    https://github.com/apache/storm/blob/aaebc3b237916340156ac3b8dc956d6c62c34983/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java#L60-L64
    
    https://github.com/apache/storm/blob/aaebc3b237916340156ac3b8dc956d6c62c34983/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslMessageToken.java#L86-L101
    
    https://github.com/apache/storm/blob/aaebc3b237916340156ac3b8dc956d6c62c34983/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageBatch.java#L80-L93
    
    It is a hand coded protocol (for good or bad).
    
    The messages inside MessageBatch have already been serialized using kryo elsewhere in the pipeline.  
    
    For the load messages we hid them as a tuple inside a MessageBatch.  We did this so we could do a rolling upgrade with it, but it is an ugly hack.
    
    BackPressureStatus is serialized/deserialized using MessageEncoder/MessageDecoder, but it uses kryo internally to do it.
    
    So please remove Serializable from BackPressureStatus.  Register it with kryo.  Do not remove `BackPressureStatus.buffer()` nor `BackPressureStatus.read()`.  I think the changes to KryoTupleSerializer to send a BackPressureState can be removed assuming no one is calling them directly.
    
    The simplest way to test this is to run a topology with multiple workers and `topology.fall.back.on.java.serialization=false`.  This is a config that makes it so kryo does not fall back to java serialization when trying to use kryo. 
    
    For normal tuples/end users you can set `topology.testing.always.try.serialize=true` and every tuple emitted will be serialized, even in local mode.  This is a way to unit test that you have setup kryo appropriately, but this BackPressureStatus is a special message so we need to do it differently.


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    Not very familiar with RAS and how it works ..... it seems to be default enabled ... Can you elaborate what setup was needed for RAS (if any) in your case ?


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r168044307
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
    @@ -22,19 +22,23 @@
     import org.apache.storm.executor.TupleInfo;
     import org.apache.storm.spout.ISpout;
     import org.apache.storm.spout.ISpoutOutputCollector;
    +import org.apache.storm.tuple.AddressedTuple;
     import org.apache.storm.tuple.MessageId;
     import org.apache.storm.tuple.TupleImpl;
     import org.apache.storm.tuple.Values;
    -import org.apache.storm.utils.Utils;
     import org.apache.storm.utils.MutableLong;
     import org.apache.storm.utils.RotatingMap;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
     import java.util.ArrayList;
     import java.util.List;
     import java.util.Random;
     
    +// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally
    --- End diff --
    
    OK. Please let me know if you plan to figure out in time frame of Storm 2.0.0. I'll add it in epic of releasing Storm 2.0.0. Thanks!


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    Yes sure. My storm.yaml is as follows:
    ```
    storm.zookeeper.servers:
         - "persistmist.corp.ne1.yahoo.com"
    
    nimbus.seeds: ["persistmist.corp.ne1.yahoo.com", "localhost"]
    
    storm.local.dir: "/tmp/apache-storm-2.0.0-SNAPSHOT/storm-local"
    supervisor.run.worker.as.user: true
    supervisor.worker.launcher: /etc/storm/worker-launcher
    nimbus.authorizer: "org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer"
    nimbus.supervisor.users:
       - "mapredqa"
    nimbus.admins:
       - "mapredqa"
       - "ethan"
    nimbus.users:
       - "mapredqa"
       - "ethan"
    ui.users:
       - "mapredqa"
       - "ethan"
    logs.users:
       - "mapredqa"
       - "ethan"
    
    storm.thrift.transport: "org.apache.storm.security.auth.kerberos.KerberosSaslTransportPlugin"
    java.security.auth.login.config: "/jaas/storm_jaas.conf"
    
    nimbus.childopts: "-Xmx1024m -Djava.security.auth.login.config=/jaas/storm_jaas.conf " # " -Dsun.security.krb5.debug=true"
    ui.childopts: "-Xmx768m -Djava.security.auth.login.config=/jaas/storm_jaas.conf" # -Dsun.security.krb5.debug=true"
    supervisor.childopts: "-Xmx256m -Djava.security.auth.login.config=/jaas/storm_jaas.conf"
    
    storm.principal.tolocal: "org.apache.storm.security.auth.KerberosPrincipalToLocal"
    storm.zookeeper.superACL: "sasl:mapredqa"
    storm.blobstore.acl.validation.enabled: false
    ui.header.buffer.bytes: 65536
    ui.filter: "org.apache.hadoop.security.authentication.server.AuthenticationFilter"
    ui.filter.params:
       "type": "kerberos"
       "kerberos.principal": "HTTP/persistmist.corp.ne1.yahoo.com"
       "kerberos.keytab": "/keytabs/HTTP.keytab"
       "kerberos.name.rules": "DEFAULT"
    scheduler.display.resource: true
    storm.scheduler: "org.apache.storm.scheduler.resource.ResourceAwareScheduler"
    ```


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167387844
  
    --- Diff: pom.xml ---
    @@ -259,6 +259,7 @@
             <snakeyaml.version>1.11</snakeyaml.version>
             <httpclient.version>4.3.3</httpclient.version>
             <clojure.tools.cli.version>0.2.4</clojure.tools.cli.version>
    +        <jctools.version>2.0.1</jctools.version>
    --- End diff --
    
    I guess I misread things, please ignore this comment.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167385590
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
    @@ -44,14 +48,15 @@
         private final Boolean isEventLoggers;
         private final Boolean isDebug;
         private final RotatingMap<Long, TupleInfo> pending;
    +    private TupleInfo globalTupleInfo = new TupleInfo();  // thread safety: assumes Collector.emit*() calls are externally synchronized (if needed).
    --- End diff --
    
    Accessing the Thread Local (TL) instance via ThreadLocal.get() typically involves a map lookup behind the scenes.
    
    **Related Note:** I reluctantly used TL for latching on to JCQueue.BatchInserter instance for the producers to JCQueue. Reluctant since I noticed perf hit when doing some targeted microbenchmarking. I used it anyway because it was a perf improvement over the ConcurrentHashMap employed in Disruptor, and eliminating TL needed a bigger change to the interface and producers. I think it is possible to achieve TL free JCQueue and gain some perf.. perhaps in a follow up jira.
    
    Although many decisions were measured, due to scope it was not feasible to measure each one. So, in the critical path, I have taken this general approach of :
    - avoid locks & synchronization ... and try using lock/wait-free approaches where synchronization is unavoidable.
    - avoid map lookups and object creation
    
    This was a case of avoiding synchronization, (TL) map lookups & object allocation.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r161358582
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
    @@ -196,19 +197,21 @@ public static Executor mkExecutor(WorkerState workerState, List<Long> executorId
                 executor.stats = new BoltExecutorStats(ConfigUtils.samplingRate(executor.getStormConf()),ObjectReader.getInt(executor.getStormConf().get(Config.NUM_STAT_BUCKETS)));
             }
     
    +        int minId = Integer.MAX_VALUE;
             Map<Integer, Task> idToTask = new HashMap<>();
             for (Integer taskId : taskIds) {
    +            minId = Math.min(minId, taskId);
                 try {
                     Task task = new Task(executor, taskId);
    -                executor.sendUnanchored(
    -                        task, StormCommon.SYSTEM_STREAM_ID, new Values("startup"), executor.getExecutorTransfer());
    +                task.sendUnanchored( StormCommon.SYSTEM_STREAM_ID, new Values("startup"), executor.getExecutorTransfer(), null); // TODO: Roshan: does this get delivered/handled anywhere ?
    --- End diff --
    
    I don't care too much, will retain if it has some use.


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    - Thanks for giving it a spin and posting the numbers. The higher CPU in multi-worker mode is not something I have seen before. Will take a look. Can you please share the cmd line that you used ? Were all 4 workers on same host ?
    - Plan is to rerun the suite that was previously published and see if any regressions were introduced due to recent changes and the rebasing .. which i think introduced about 300 or so commits since the last perf run. Will do this once we have 
    -**Side Note:** If you run TVL again, do pay attention to the latency on the UI. TVLs report does not seem to be including the actual latency.
    
    Will quash the commits and refresh this PR  so that it can be committed. 


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167331543
  
    --- Diff: storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchExecutor.java ---
    @@ -194,7 +194,12 @@ public void reportError(Throwable t) {
             public void emitDirect(int task, String stream, List<Object> values, Object id) {
                 throw new UnsupportedOperationException("Trident does not support direct streams");
             }
    -        
    +
    +        @Override
    +        public void flush() {
    +            //NOOP   //TODO: Roshan: validate if this is OK
    --- End diff --
    
    TODO needs to go away.  Is this OK to not have a flush?


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    Yes I only did the performance test for single node.
    
    Script for running TVL:
    * https://gist.github.com/HeartSaVioR/139a1a8ea6a6e285578598095d1c816f (for 4 workers)
    * https://gist.github.com/HeartSaVioR/27300c7e476efb4b58e86656e2073505 (for 1 worker)
    
    I'm also planning to share some scripts regarding verifying release, running daemons in a node via tmux/tmuxinator.


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    I think I have addressed all the major and minor issues as well. 


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159960326
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/Executor.java ---
    @@ -225,51 +228,62 @@ private static String getExecutorType(WorkerTopologyContext workerTopologyContex
             }
         }
     
    +    public Queue<AddressedTuple> getPendingEmits() {
    +        return pendingEmits;
    +    }
    +
         /**
          * separated from mkExecutor in order to replace executor transfer in executor data for testing
          */
         public ExecutorShutdown execute() throws Exception {
             LOG.info("Loading executor tasks " + componentId + ":" + executorId);
     
    -        registerBackpressure();
    -        Utils.SmartThread systemThreads =
    -                Utils.asyncLoop(executorTransfer, executorTransfer.getName(), reportErrorDie);
    -
             String handlerName = componentId + "-executor" + executorId;
    -        Utils.SmartThread handlers =
    +        Utils.SmartThread handler =
                     Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName);
             setupTicks(StatsUtil.SPOUT.equals(type));
     
             LOG.info("Finished loading executor " + componentId + ":" + executorId);
    -        return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask);
    +        return new ExecutorShutdown(this, Lists.newArrayList(handler), idToTask);
         }
     
         public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception;
     
    -    @SuppressWarnings("unchecked")
         @Override
    -    public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception {
    -        ArrayList<AddressedTuple> addressedTuples = (ArrayList<AddressedTuple>) event;
    -        for (AddressedTuple addressedTuple : addressedTuples) {
    -            TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
    -            int taskId = addressedTuple.getDest();
    -            if (isDebug) {
    -                LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
    -            }
    +    public void accept(Object event) {
    +        if (event == JCQueue.INTERRUPT) {
    +            throw new RuntimeException(new InterruptedException("JCQ processing interrupted") );
    --- End diff --
    
    Could you explain a bit more when JCQueue.INTERRUPT happens and where this exception is caught/handled?  Almost every other place that we use an InterruptedException it is treated as the worker is going down in an orderly manor so end the thread without complaining.  I just want to be sure that we don't accidentally shut down part of the worker in some odd cases.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159962361
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java ---
    @@ -17,72 +17,124 @@
      */
     package org.apache.storm.executor;
     
    -import com.google.common.annotations.VisibleForTesting;
    -import com.lmax.disruptor.EventHandler;
     import org.apache.storm.Config;
     import org.apache.storm.daemon.worker.WorkerState;
     import org.apache.storm.serialization.KryoTupleSerializer;
     import org.apache.storm.tuple.AddressedTuple;
    -import org.apache.storm.tuple.Tuple;
    -import org.apache.storm.utils.DisruptorQueue;
    -import org.apache.storm.utils.MutableObject;
    +import org.apache.storm.utils.JCQueue;
     import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.Utils;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.util.ArrayList;
    +import java.util.Collections;
     import java.util.Map;
    -import java.util.concurrent.Callable;
    +import java.util.Queue;
     
    -public class ExecutorTransfer implements EventHandler, Callable {
    +// Every executor has an instance of this class
    +public class ExecutorTransfer  {
         private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class);
     
         private final WorkerState workerData;
    -    private final DisruptorQueue batchTransferQueue;
    -    private final Map<String, Object> topoConf;
         private final KryoTupleSerializer serializer;
    -    private final MutableObject cachedEmit;
         private final boolean isDebug;
    +    private final int producerBatchSz;
    +    private int remotesBatchSz = 0;
    +    private int indexingBase = 0;
    +    private ArrayList<JCQueue> localReceiveQueues; // [taskId-indexingBase] => queue : List of all recvQs local to this worker
    +    private ArrayList<JCQueue> queuesToFlush; // [taskId-indexingBase] => queue, some entries can be null. : outbound Qs for this executor instance
     
    -    public ExecutorTransfer(WorkerState workerData, DisruptorQueue batchTransferQueue, Map<String, Object> topoConf) {
    +
    +    public ExecutorTransfer(WorkerState workerData, Map<String, Object> topoConf) {
             this.workerData = workerData;
    -        this.batchTransferQueue = batchTransferQueue;
    -        this.topoConf = topoConf;
             this.serializer = new KryoTupleSerializer(topoConf, workerData.getWorkerTopologyContext());
    -        this.cachedEmit = new MutableObject(new ArrayList<>());
             this.isDebug = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);
    +        this.producerBatchSz = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
    +    }
    +
    +    // to be called after all Executor objects in the worker are created and before this object is used
    +    public void initLocalRecvQueues() {
    +        Integer minTaskId = workerData.getLocalReceiveQueues().keySet().stream().min(Integer::compareTo).get();
    +        this.localReceiveQueues = Utils.convertToArray( workerData.getLocalReceiveQueues(), minTaskId);
    +        this.indexingBase = minTaskId;
    +        this.queuesToFlush = new ArrayList<JCQueue>(Collections.nCopies(localReceiveQueues.size(), null) );
         }
     
    -    public void transfer(int task, Tuple tuple) {
    -        AddressedTuple val = new AddressedTuple(task, tuple);
    +    // adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null)
    +    public boolean tryTransfer(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits) {
             if (isDebug) {
    -            LOG.info("TRANSFERRING tuple {}", val);
    +            LOG.info("TRANSFERRING tuple {}", addressedTuple);
    +        }
    +
    +        JCQueue localQueue = getLocalQueue(addressedTuple);
    +        if (localQueue!=null) {
    +            return tryTransferLocal(addressedTuple, localQueue, pendingEmits);
    +        }  else  {
    +            if (remotesBatchSz >= producerBatchSz) {
    +                if ( !workerData.tryFlushRemotes() ) {
    +                    if (pendingEmits != null) {
    +                        pendingEmits.add(addressedTuple);
    +                    }
    +                    return false;
    +                }
    +                remotesBatchSz = 0;
    --- End diff --
    
    Do we have a race condition here?  I believe that this method can be called from multiple different threads, and if so then we now have to worry about remotesBatchSz staying consistent.


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    Congrats @roshannaik great effort and perseverance to get this in and thanks to @revans2 for reviewing in great detail. 


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r160066094
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java ---
    @@ -177,6 +195,35 @@ public BuiltinMetrics getBuiltInMetrics() {
             return builtInMetrics;
         }
     
    +
    +    // Non Blocking call. If cannot emit to destination immediately, such tuples will be added to `pendingEmits` argument
    +    public void sendUnanchored(String stream, List<Object> values, ExecutorTransfer transfer, Queue<AddressedTuple> pendingEmits) {
    +        Tuple tuple = getTuple(stream, values);
    +        List<Integer> tasks = getOutgoingTasks(stream, values);
    +        for (Integer t : tasks) {
    +            AddressedTuple addressedTuple = new AddressedTuple(t, tuple);
    +            transfer.tryTransfer(addressedTuple, pendingEmits);
    +        }
    +    }
    +
    +    /**
    +     * Send sampled data to the eventlogger if the global or component level debug flag is set (via nimbus api).
    +     */
    +    public void sendToEventLogger(Executor executor, List values,
    --- End diff --
    
    I reread the code and found sampling percentage can be changed. I was thinking about reducing random.nextDouble(), but in this case we may not be able to do that. Please ignore my previous comment.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167288058
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/Acker.java ---
    @@ -99,17 +100,19 @@ public void execute(Tuple input) {
                 pending.put(id, curr);
             } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
                 resetTimeout = true;
    -            if (curr != null) {
    +            if (curr == null) {
    --- End diff --
    
    Why is this change being made?


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167288382
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java ---
    @@ -362,6 +362,8 @@ public static void addSystemStreams(StormTopology topology) {
         public static void addEventLogger(Map<String, Object> conf, StormTopology topology) {
             Integer numExecutors = ObjectReader.getInt(conf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS),
                     ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS)));
    +        if(numExecutors==null || numExecutors==0)
    --- End diff --
    
    nit can we fix the style here with a space after the if and '{' '}'


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167385890
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License
    + */
    +
    +package org.apache.storm.messaging.netty;
    +
    +import org.apache.storm.serialization.KryoValuesDeserializer;
    +import org.apache.storm.serialization.KryoValuesSerializer;
    +import org.jboss.netty.buffer.ChannelBuffer;
    +import org.jboss.netty.buffer.ChannelBuffers;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
    +public class BackPressureStatus implements java.io.Serializable {
    --- End diff --
    
    I was not aware of that. Do we have an example of what i should do instead ?


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    @Ethanlm  should be able to take a look in a couple days. can you share any settings you are using outside in config file other than the one you noted on the cmd line (toplogy.debug=true) ?  esp RAS related.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167293987
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---
    @@ -456,137 +450,135 @@ public void refreshStormActive(Runnable callback) {
             }
         }
     
    -    public void refreshThrottle() {
    -        boolean backpressure = stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, this::refreshThrottle);
    -        this.throttleOn.set(backpressure);
    -    }
    -
    -    private static double getQueueLoad(DisruptorQueue q) {
    -        DisruptorQueue.QueueMetrics qMetrics = q.getMetrics();
    +    private static double getQueueLoad(JCQueue q) {
    +        JCQueue.QueueMetrics qMetrics = q.getMetrics();
             return ((double) qMetrics.population()) / qMetrics.capacity();
         }
     
         public void refreshLoad(List<IRunningExecutor> execs) {
    -        Set<Integer> remoteTasks = Sets.difference(new HashSet<>(outboundTasks), new HashSet<>(taskIds));
    +        Set<Integer> remoteTasks = Sets.difference(new HashSet<>(outboundTasks), new HashSet<>(localTaskIds));
             Long now = System.currentTimeMillis();
             Map<Integer, Double> localLoad = new HashMap<>();
    -        for (IRunningExecutor exec: execs) {
    +        for (IRunningExecutor exec : execs) {
                 double receiveLoad = getQueueLoad(exec.getReceiveQueue());
    -            double sendLoad = getQueueLoad(exec.getSendQueue());
    -            localLoad.put(exec.getExecutorId().get(0).intValue(), Math.max(receiveLoad, sendLoad));
    +            localLoad.put(exec.getExecutorId().get(0).intValue(), receiveLoad);
             }
     
             Map<Integer, Load> remoteLoad = new HashMap<>();
             cachedNodeToPortSocket.get().values().stream().forEach(conn -> remoteLoad.putAll(conn.getLoad(remoteTasks)));
             loadMapping.setLocal(localLoad);
             loadMapping.setRemote(remoteLoad);
     
    -        if (now > nextUpdate.get()) {
    +        if (now > nextLoadUpdate.get()) {
                 receiver.sendLoadMetrics(localLoad);
    -            nextUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
    +            nextLoadUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
             }
         }
     
    +    // checks if the tasks which had back pressure are now free again. if so, sends an update to other workers
    +    public void refreshBackPressureStatus() {
    +        LOG.debug("Checking for change in Backpressure status on worker's tasks");
    +        boolean bpSituationChanged = bpTracker.refreshBpTaskList();
    +        if (bpSituationChanged) {
    +            BackPressureStatus bpStatus = bpTracker.getCurrStatus();
    +            receiver.sendBackPressureStatus(bpStatus);
    +        }
    +    }
    +
    +
         /**
          * we will wait all connections to be ready and then activate the spout/bolt
          * when the worker bootup.
          */
         public void activateWorkerWhenAllConnectionsReady() {
             int delaySecs = 0;
             int recurSecs = 1;
    -        refreshActiveTimer.schedule(delaySecs, new Runnable() {
    -            @Override public void run() {
    +        refreshActiveTimer.schedule(delaySecs,
    +            () -> {
                     if (areAllConnectionsReady()) {
                         LOG.info("All connections are ready for worker {}:{} with id {}", assignmentId, port, workerId);
                         isWorkerActive.set(Boolean.TRUE);
                     } else {
                         refreshActiveTimer.schedule(recurSecs, () -> activateWorkerWhenAllConnectionsReady(), false, 0);
                     }
                 }
    -        });
    +        );
         }
     
         public void registerCallbacks() {
             LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId, port);
             receiver.registerRecv(new DeserializingConnectionCallback(topologyConf,
                 getWorkerTopologyContext(),
    -            this::transferLocal));
    +            this::transferLocalBatch));
    +        // Send curr BackPressure status to new clients
    +        receiver.registerNewConnectionResponse(
    +            () -> {
    +                BackPressureStatus bpStatus = bpTracker.getCurrStatus();
    +                LOG.info("Sending BackPressure status to new client. BPStatus: {}", bpStatus);
    +                return bpStatus;
    +            }
    +        );
         }
     
    -    public void transferLocal(List<AddressedTuple> tupleBatch) {
    -        Map<Integer, List<AddressedTuple>> grouped = new HashMap<>();
    -        for (AddressedTuple tuple : tupleBatch) {
    -            Integer executor = taskToShortExecutor.get(tuple.dest);
    -            if (null == executor) {
    -                LOG.warn("Received invalid messages for unknown tasks. Dropping... ");
    -                continue;
    -            }
    -            List<AddressedTuple> current = grouped.get(executor);
    -            if (null == current) {
    -                current = new ArrayList<>();
    -                grouped.put(executor, current);
    -            }
    -            current.add(tuple);
    -        }
    +    /* Not a Blocking call. If cannot emit, will add 'tuple' to pendingEmits and return 'false'. 'pendingEmits' can be null */
    +    public boolean tryTransferRemote(AddressedTuple tuple, Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) {
    +        return workerTransfer.tryTransferRemote(tuple, pendingEmits, serializer);
    +    }
     
    -        for (Map.Entry<Integer, List<AddressedTuple>> entry : grouped.entrySet()) {
    -            DisruptorQueue queue = shortExecutorReceiveQueueMap.get(entry.getKey());
    -            if (null != queue) {
    -                queue.publish(entry.getValue());
    -            } else {
    -                LOG.warn("Received invalid messages for unknown tasks. Dropping... ");
    -            }
    -        }
    +    public void flushRemotes() throws InterruptedException {
    +        workerTransfer.flushRemotes();
         }
     
    -    public void transfer(KryoTupleSerializer serializer, List<AddressedTuple> tupleBatch) {
    -        if (trySerializeLocal) {
    -            assertCanSerialize(serializer, tupleBatch);
    -        }
    -        List<AddressedTuple> local = new ArrayList<>();
    -        Map<Integer, List<TaskMessage>> remoteMap = new HashMap<>();
    -        for (AddressedTuple addressedTuple : tupleBatch) {
    -            int destTask = addressedTuple.getDest();
    -            if (taskIds.contains(destTask)) {
    -                // Local task
    -                local.add(addressedTuple);
    -            } else {
    -                // Using java objects directly to avoid performance issues in java code
    -                if (! remoteMap.containsKey(destTask)) {
    -                    remoteMap.put(destTask, new ArrayList<>());
    +    public boolean tryFlushRemotes() {
    +        return workerTransfer.tryFlushRemotes();
    +    }
    +
    +    // Receives msgs from remote workers and feeds them to local executors. If any receiving local executor is under Back Pressure,
    +    // informs other workers about back pressure situation. Runs in the NettyWorker thread.
    +    private void transferLocalBatch(ArrayList<AddressedTuple> tupleBatch) {
    +        int lastOverflowCount = 0; // overflowQ size at the time the last BPStatus was sent
    +
    +        for (int i = 0; i < tupleBatch.size(); i++) {
    +            AddressedTuple tuple = tupleBatch.get(i);
    +            JCQueue queue = shortExecutorReceiveQueueMap.get(tuple.dest);
    +
    +            // 1- try adding to main queue if its overflow is not empty
    +            if (queue.isEmptyOverflow()) {
    +                if (queue.tryPublish(tuple)) {
    +                    continue;
                     }
    -                remoteMap.get(destTask).add(new TaskMessage(destTask, serializer.serialize(addressedTuple.getTuple())));
                 }
    -        }
     
    -        if (!local.isEmpty()) {
    -            transferLocal(local);
    -        }
    -        if (!remoteMap.isEmpty()) {
    -            transferQueue.publish(remoteMap);
    -        }
    -    }
    +            // 2- BP detected (i.e MainQ is full). So try adding to overflow
    +            int currOverflowCount = queue.getOverflowCount();
    +            if (bpTracker.recordBackPressure(tuple.dest, queue)) {
    +                receiver.sendBackPressureStatus(bpTracker.getCurrStatus());
    +                lastOverflowCount = currOverflowCount;
    +            } else {
     
    -    // TODO: consider having a max batch size besides what disruptor does automagically to prevent latency issues
    -    public void sendTuplesToRemoteWorker(HashMap<Integer, ArrayList<TaskMessage>> packets, long seqId, boolean batchEnd) {
    -        drainer.add(packets);
    -        if (batchEnd) {
    -            ReentrantReadWriteLock.ReadLock readLock = endpointSocketLock.readLock();
    -            try {
    -                readLock.lock();
    -                drainer.send(cachedTaskToNodePort.get(), cachedNodeToPortSocket.get());
    -            } finally {
    -                readLock.unlock();
    +                if (currOverflowCount - lastOverflowCount > 10000) {
    +                    // resend BP status, in case prev notification was missed or reordered
    +                    BackPressureStatus bpStatus = bpTracker.getCurrStatus();
    +                    receiver.sendBackPressureStatus(bpStatus);
    +                    lastOverflowCount = currOverflowCount;
    +                    LOG.debug("Re-sent BackPressure Status. OverflowCount = {}, BP Status ID = {}. ", currOverflowCount, bpStatus.id);
    +                }
    +            }
    +            if (!queue.tryPublishToOverflow(tuple)) {
    +                dropMessage(tuple, queue);
                 }
    -            drainer.clear();
             }
         }
     
    +    private void dropMessage(AddressedTuple tuple, JCQueue queue) {
    +        ++dropCount;
    +        queue.recordMsgDrop();
    +        LOG.warn("Dropping message as overflow threshold has reached for Q = {}. OverflowCount = {}. Total Drop Count= {}, Dropped Message : {}", queue.getName(), queue.getOverflowCount(), dropCount, tuple.toString());
    --- End diff --
    
    nit: `tuple.toString()` is not needed, as the logging will do it for you, but since this is a warn log that should almost never be called it really does not matter.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167383283
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/Acker.java ---
    @@ -99,17 +100,19 @@ public void execute(Tuple input) {
                 pending.put(id, curr);
             } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
                 resetTimeout = true;
    -            if (curr != null) {
    +            if (curr == null) {
    --- End diff --
    
    Unintentional. This appears to be due a mistake most likely when resolving merge conflicts when I rebased my code to master sometime in Dec 2017. Will revert. Thanks for catching.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r164955129
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java ---
    @@ -0,0 +1,81 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License
    + */
    +
    +package org.apache.storm.daemon.worker;
    +
    +import org.apache.storm.messaging.netty.BackPressureStatus;
    +import org.apache.storm.utils.JCQueue;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import static org.apache.storm.Constants.SYSTEM_TASK_ID;
    +
    +public class BackPressureTracker {
    +    static final Logger LOG = LoggerFactory.getLogger(BackPressureTracker.class);
    +
    +    private final Map<Integer, JCQueue> bpTasks = new ConcurrentHashMap<>(); // updates are more frequent than iteration
    +    private final Set<Integer> nonBpTasks = ConcurrentHashMap.newKeySet();
    +    private final String workerId;
    +
    +    public BackPressureTracker(String workerId, List<Integer> allLocalTasks) {
    +        this.workerId = workerId;
    +        this.nonBpTasks.addAll(allLocalTasks);    // all tasks are considered to be not under BP initially
    +        this.nonBpTasks.remove((int)SYSTEM_TASK_ID);   // not tracking system task
    +    }
    +
    +    /* called by transferLocalBatch() on NettyWorker thread
    +     * returns true if an update was recorded, false if taskId is already under BP
    +     */
    +    public boolean recordBackpressure(Integer taskId, JCQueue recvQ) {
    +        if (nonBpTasks.remove(taskId)) {
    +            bpTasks.put(taskId, recvQ);
    +            return true;
    +        }
    +        return false;
    +    }
    +
    +    // returns true if there was a change in the BP situation
    +    public boolean refreshBpTaskList() {
    +        boolean changed = false;
    +        LOG.debug("Running Back Pressure status change check");
    +        for (Iterator<Entry<Integer, JCQueue>> itr = bpTasks.entrySet().iterator(); itr.hasNext(); ) {
    +            Entry<Integer, JCQueue> entry = itr.next();
    +            if (entry.getValue().isEmptyOverflow()) {
    +                // move task from bpTasks to noBpTasks
    +                nonBpTasks.add(entry.getKey());
    +                itr.remove();
    --- End diff --
    
    Next update will simplify the logic by using a single map and fix this issue as well.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159968772
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/RunningAvg.java ---
    @@ -0,0 +1,88 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License
    + */
    +
    +package org.apache.storm.utils;
    +
    +public class RunningAvg {
    +
    +    private long n = 0;
    +    private double oldM, newM, oldS, newS;
    +    private String name;
    +    public static int printFreq = 20_000_000;
    +    private boolean disable;
    +    private long count = 0;
    +
    +    public RunningAvg(String name, boolean disable) {
    +        this(name, printFreq, disable);
    +    }
    +
    +    public RunningAvg(String name, int printFreq) {
    +        this(name, printFreq, false);
    +    }
    +
    +    public RunningAvg(String name, int printFreq, boolean disable) {
    +        this.name = name + "_" + Thread.currentThread().getName();
    +        this.printFreq = printFreq;
    +        this.disable = disable;
    +    }
    +
    +    public void clear() {
    +        n = 0;
    +    }
    +
    +    public void pushLatency(long startMs) {
    +        push(System.currentTimeMillis() - startMs);
    +    }
    +
    +    public void push(long x) {
    +        if (disable) {
    +            return;
    +        }
    +
    +        n++;
    +
    +        if (n == 1) {
    +            oldM = newM = x;
    +            oldS = 0;
    +        } else {
    +            newM = oldM + (x - oldM) / n;
    +            newS = oldS + (x - oldM) * (x - newM);
    +
    +            // set up for next iteration
    +            oldM = newM;
    +            oldS = newS;
    +        }
    +        if (++count == printFreq) {
    +            System.err.printf("  ***> %s - %,.2f\n", name, mean());
    --- End diff --
    
    Can we log this instead of printing it?  Also could you file a follow on JIRA so that when we go to a better metrics implementation that we use that instead of printing/logging?


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159969789
  
    --- Diff: storm-server/src/main/java/org/apache/storm/Testing.java ---
    @@ -712,6 +712,6 @@ public static Tuple testTuple(List<Object> values, MkTupleParam param) {
                     new HashMap<>(),
                     new HashMap<>(),
                     new AtomicBoolean(false));
    -        return new TupleImpl(context, values, 1, stream);
    +        return new TupleImpl(context, values, "component", 1, stream);
    --- End diff --
    
    This I think is the cause of some of the test failures....


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    Looks like storm-core error is related to worker crash while shutting down.
    
    https://travis-ci.org/apache/storm/jobs/336372156
    
    ```
    36913 [SLOT_1027] INFO  o.a.s.e.ExecutorShutdown - Shut down executor 2:[3, 3]
    36913 [SLOT_1027] INFO  o.a.s.e.ExecutorShutdown - Shutting down executor 19d8dc30-7afc-44b1-a466-26064b50a580:[2, 2]
    36913 [Thread-334-19d8dc30-7afc-44b1-a466-26064b50a580-executor[2, 2]] INFO  o.a.s.u.Utils - Async loop interrupted!
    36914 [SLOT_1027] INFO  o.a.s.e.ExecutorShutdown - Shut down executor 19d8dc30-7afc-44b1-a466-26064b50a580:[2, 2]
    36914 [SLOT_1027] INFO  o.a.s.e.ExecutorShutdown - Shutting down executor 1:[1, 1]
    36914 [Thread-335-1-executor[1, 1]] INFO  o.a.s.u.Utils - Async loop interrupted!
    36914 [SLOT_1027] INFO  o.a.s.e.ExecutorShutdown - Shut down executor 1:[1, 1]
    36914 [SLOT_1027] INFO  o.a.s.d.w.Worker - Shut down executors
    36914 [SLOT_1027] INFO  o.a.s.d.w.Worker - Shutting down transfer thread
    36914 [Worker-Transfer] ERROR o.a.s.u.Utils - Async loop died!
    java.lang.ClassCastException: java.lang.Object cannot be cast to org.apache.storm.messaging.TaskMessage
    	at org.apache.storm.daemon.worker.WorkerTransfer.accept(WorkerTransfer.java:84) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    	at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:309) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    	at org.apache.storm.utils.JCQueue.consume(JCQueue.java:290) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    	at org.apache.storm.utils.JCQueue.consume(JCQueue.java:281) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    	at org.apache.storm.daemon.worker.WorkerTransfer.lambda$makeTransferThread$0(WorkerTransfer.java:75) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    	at org.apache.storm.utils.Utils$2.run(Utils.java:350) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
    36914 [Worker-Transfer] ERROR o.a.s.u.Utils - Async loop died!
    java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Object cannot be cast to org.apache.storm.messaging.TaskMessage
    	at org.apache.storm.utils.Utils$2.run(Utils.java:363) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
    Caused by: java.lang.ClassCastException: java.lang.Object cannot be cast to org.apache.storm.messaging.TaskMessage
    	at org.apache.storm.daemon.worker.WorkerTransfer.accept(WorkerTransfer.java:84) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    	at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:309) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    	at org.apache.storm.utils.JCQueue.consume(JCQueue.java:290) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    	at org.apache.storm.utils.JCQueue.consume(JCQueue.java:281) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    	at org.apache.storm.daemon.worker.WorkerTransfer.lambda$makeTransferThread$0(WorkerTransfer.java:75) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    	at org.apache.storm.utils.Utils$2.run(Utils.java:350) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    	... 1 more
    36915 [Worker-Transfer] ERROR o.a.s.u.Utils - Halting process: Async loop died!
    java.lang.RuntimeException: Halting process: Async loop died!
    	at org.apache.storm.utils.Utils.exitProcess(Utils.java:465) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    	at org.apache.storm.utils.Utils$3.uncaughtException(Utils.java:373) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
    	at java.lang.Thread.dispatchUncaughtException(Thread.java:1959) [?:1.8.0_151]
    ```


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159957697
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---
    @@ -155,134 +150,159 @@ public void start() throws Exception {
     
             Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
                 @Override public Object run() throws Exception {
    -                workerState =
    -                    new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
    +                return loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials);
    +            }
    +        }); // Subject.doAs(...)
    +
    +    }
    +
    +    private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState, Map<String, String> initCreds, Credentials initialCredentials)
    +            throws Exception {
    +        workerState =
    +                new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
                             stormClusterState);
     
    -                // Heartbeat here so that worker process dies if this fails
    -                // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
    -                // that worker is running and moves on
    -                doHeartBeat();
    +        // Heartbeat here so that worker process dies if this fails
    +        // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
    +        // that worker is running and moves on
    +        doHeartBeat();
     
    -                executorsAtom = new AtomicReference<>(null);
    +        executorsAtom = new AtomicReference<>(null);
     
    -                // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
    -                // to the supervisor
    -                workerState.heartbeatTimer
    -                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
    -                        try {
    -                            doHeartBeat();
    -                        } catch (IOException e) {
    -                            throw new RuntimeException(e);
    -                        }
    -                    });
    +        // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
    +        // to the supervisor
    +        workerState.heartbeatTimer
    +                .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
    +                    try {
    +                        doHeartBeat();
    +                    } catch (IOException e) {
    +                        throw new RuntimeException(e);
    +                    }
    +                });
     
    -                workerState.executorHeartbeatTimer
    -                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
    +        workerState.executorHeartbeatTimer
    +                .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
                             Worker.this::doExecutorHeartbeats);
     
    -                workerState.registerCallbacks();
    +        workerState.registerCallbacks();
     
    -                workerState.refreshConnections(null);
    +        workerState.refreshConnections(null);
     
    -                workerState.activateWorkerWhenAllConnectionsReady();
    +        workerState.activateWorkerWhenAllConnectionsReady();
     
    -                workerState.refreshStormActive(null);
    +        workerState.refreshStormActive(null);
     
    -                workerState.runWorkerStartHooks();
    +        workerState.runWorkerStartHooks();
     
    -                List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
    -                for (List<Long> e : workerState.getExecutors()) {
    -                    if (ConfigUtils.isLocalMode(topologyConf)) {
    -                        newExecutors.add(
    -                            LocalExecutor.mkExecutor(workerState, e, initCreds)
    -                                .execute());
    -                    } else {
    -                        newExecutors.add(
    -                            Executor.mkExecutor(workerState, e, initCreds)
    -                                .execute());
    -                    }
    -                }
    -                executorsAtom.set(newExecutors);
    +        List<Executor> execs = new ArrayList<>();
    +        for (List<Long> e : workerState.getExecutors()) {
    +            if (ConfigUtils.isLocalMode(topologyConf)) {
    +                Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
    +                execs.add( executor );
    +                workerState.localReceiveQueues.put(executor.getTaskIds().get(0), executor.getReceiveQueue());
    +            } else {
    +                Executor executor = Executor.mkExecutor(workerState, e, initCreds);
    +                workerState.localReceiveQueues.put(executor.getTaskIds().get(0), executor.getReceiveQueue());
    +                execs.add(executor);
    +            }
    +        }
     
    -                EventHandler<Object> tupleHandler = (packets, seqId, batchEnd) -> workerState
    -                    .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>) packets, seqId, batchEnd);
    +        List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
    +        for (Executor executor : execs) {
    +            newExecutors.add(executor.execute());
    +        }
    +        executorsAtom.set(newExecutors);
     
    -                // This thread will publish the messages destined for remote tasks to remote connections
    -                transferThread = Utils.asyncLoop(() -> {
    -                    workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
    -                    return 0L;
    -                });
    +        // This thread will send out messages destined for remote tasks (on other workers)
    +        transferThread = workerState.makeTransferThread();
    +        transferThread.setName("Worker-Transfer");
     
    -                DisruptorBackpressureCallback disruptorBackpressureHandler =
    -                    mkDisruptorBackpressureHandler(workerState);
    -                workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
    -                workerState.transferQueue
    -                    .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
    -                workerState.transferQueue
    -                    .setHighWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
    -                workerState.transferQueue
    -                    .setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));
    -
    -                WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler();
    -                backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
    -                if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    -                    backpressureThread.start();
    -                    stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle);
    -                    
    -                    int pollingSecs = ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
    -                    workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, workerState::refreshThrottle);
    -                }
    +        credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
     
    -                credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
    +        establishLogSettingCallback();
     
    -                establishLogSettingCallback();
    +        workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
     
    -                workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);
    +        workerState.refreshCredentialsTimer.scheduleRecurring(0,
    +                (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
    +                    @Override public void run() {
    +                        checkCredentialsChanged();
    +                    }
    +                });
     
    -                workerState.refreshCredentialsTimer.scheduleRecurring(0,
    -                    (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
    -                        @Override public void run() {
    -                            checkCredentialsChanged();
    -                            if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
    -                               checkThrottleChanged();
    -                            }
    +        workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
    +                (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new Runnable() {
    +                    @Override public void run() {
    +                        try {
    +                            LOG.debug("Checking if blobs have updated");
    +                            updateBlobUpdates();
    +                        } catch (IOException e) {
    +                            // IOException from reading the version files to be ignored
    +                            LOG.error(e.getStackTrace().toString());
                             }
    -                    });
    -
    -                workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
    -                        (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new Runnable() {
    -                            @Override public void run() {
    -                                try {
    -                                    LOG.debug("Checking if blobs have updated");
    -                                    updateBlobUpdates();
    -                                } catch (IOException e) {
    -                                    // IOException from reading the version files to be ignored
    -                                    LOG.error(e.getStackTrace().toString());
    -                                }
    -                            }
    -                        });
    -
    -                // The jitter allows the clients to get the data at different times, and avoids thundering herd
    -                if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
    -                    workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
    -                }
    +                    }
    +                });
    +
    +        // The jitter allows the clients to get the data at different times, and avoids thundering herd
    +        if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
    +            workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, workerState::refreshLoad);
    +        }
    +
    +        workerState.refreshConnectionsTimer.scheduleRecurring(0,
    +                (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
     
    -                workerState.refreshConnectionsTimer.scheduleRecurring(0,
    -                    (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);
    +        workerState.resetLogLevelsTimer.scheduleRecurring(0,
    +                (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
     
    -                workerState.resetLogLevelsTimer.scheduleRecurring(0,
    -                    (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);
    +        workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
    +                workerState::refreshStormActive);
     
    -                workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
    -                    workerState::refreshStormActive);
    +        setupFlushTupleTimer(topologyConf, newExecutors);
    +        setupBackPressureCheckTimer(topologyConf);
    +
    +        LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
    --- End diff --
    
    nit: I think we log this elsewhere too, so it might be good to remove this.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r168033466
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License
    + */
    +
    +package org.apache.storm.messaging.netty;
    +
    +import org.apache.storm.serialization.KryoValuesDeserializer;
    +import org.apache.storm.serialization.KryoValuesSerializer;
    +import org.jboss.netty.buffer.ChannelBuffer;
    +import org.jboss.netty.buffer.ChannelBuffers;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
    +public class BackPressureStatus {
    +    public static final short IDENTIFIER = (short)-600;
    +    private static final int SIZE_OF_ID = 2; // size if IDENTIFIER
    +    private static final int SIZE_OF_INT = 4;
    +
    +    private static AtomicLong bpCount = new AtomicLong(0);
    +
    +    public String workerId;
    +    public final long id;                       // monotonically increasing id
    --- End diff --
    
    OK thanks for clarification. I thought it as some kinds of guarantee we should ensure. Maybe better to clear out that that's not a requirement and only for debugging purpose.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r161361184
  
    --- Diff: storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java ---
    @@ -24,50 +24,46 @@
     import org.apache.storm.task.GeneralTopologyContext;
     
     public class TupleImpl implements Tuple {
    -    private final List<Object> values;
    -    private final int taskId;
    -    private final String streamId;
    -    private final GeneralTopologyContext context;
    -    private final MessageId id;
    +    private List<Object> values;
    +    private int taskId;
    +    private String streamId;
    +    private GeneralTopologyContext context;
    +    private MessageId id;
    +    private final String srcComponent;
         private Long _processSampleStartTime;
         private Long _executeSampleStartTime;
         private long _outAckVal = 0;
    -    
    +
         public TupleImpl(Tuple t) {
             this.values = t.getValues();
             this.taskId = t.getSourceTask();
             this.streamId = t.getSourceStreamId();
             this.id = t.getMessageId();
             this.context = t.getContext();
    -        if (t instanceof TupleImpl) {
    +        this.srcComponent = t.getSourceComponent();
    +        try {
                 TupleImpl ti = (TupleImpl) t;
                 this._processSampleStartTime = ti._processSampleStartTime;
                 this._executeSampleStartTime = ti._executeSampleStartTime;
                 this._outAckVal = ti._outAckVal;
    +        } catch (ClassCastException e) {
    +            // ignore ... if t is not a TupleImpl type .. faster than checking and then casting
             }
         }
     
    -    public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
    +    public TupleImpl(GeneralTopologyContext context, List<Object> values, String srcComponent, int taskId, String streamId, MessageId id) {
             this.values = Collections.unmodifiableList(values);
             this.taskId = taskId;
             this.streamId = streamId;
             this.id = id;
             this.context = context;
    -        
    -        String componentId = context.getComponentId(taskId);
    -        Fields schema = context.getComponentOutputFields(componentId, streamId);
    -        if(values.size()!=schema.size()) {
    -            throw new IllegalArgumentException(
    -                    "Tuple created with wrong number of fields. " +
    -                    "Expected " + schema.size() + " fields but got " +
    -                    values.size() + " fields");
    -        }
    --- End diff --
    
    Ok, enabling it for local mode. Doing my best to eliminate map lookups in critical path. 


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r164956555
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java ---
    @@ -17,72 +17,124 @@
      */
     package org.apache.storm.executor;
     
    -import com.google.common.annotations.VisibleForTesting;
    -import com.lmax.disruptor.EventHandler;
     import org.apache.storm.Config;
     import org.apache.storm.daemon.worker.WorkerState;
     import org.apache.storm.serialization.KryoTupleSerializer;
     import org.apache.storm.tuple.AddressedTuple;
    -import org.apache.storm.tuple.Tuple;
    -import org.apache.storm.utils.DisruptorQueue;
    -import org.apache.storm.utils.MutableObject;
    +import org.apache.storm.utils.JCQueue;
     import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.Utils;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.util.ArrayList;
    +import java.util.Collections;
     import java.util.Map;
    -import java.util.concurrent.Callable;
    +import java.util.Queue;
     
    -public class ExecutorTransfer implements EventHandler, Callable {
    +// Every executor has an instance of this class
    +public class ExecutorTransfer  {
         private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class);
     
         private final WorkerState workerData;
    -    private final DisruptorQueue batchTransferQueue;
    -    private final Map<String, Object> topoConf;
         private final KryoTupleSerializer serializer;
    -    private final MutableObject cachedEmit;
         private final boolean isDebug;
    +    private final int producerBatchSz;
    +    private int remotesBatchSz = 0;
    +    private int indexingBase = 0;
    +    private ArrayList<JCQueue> localReceiveQueues; // [taskId-indexingBase] => queue : List of all recvQs local to this worker
    +    private ArrayList<JCQueue> queuesToFlush; // [taskId-indexingBase] => queue, some entries can be null. : outbound Qs for this executor instance
     
    -    public ExecutorTransfer(WorkerState workerData, DisruptorQueue batchTransferQueue, Map<String, Object> topoConf) {
    +
    +    public ExecutorTransfer(WorkerState workerData, Map<String, Object> topoConf) {
             this.workerData = workerData;
    -        this.batchTransferQueue = batchTransferQueue;
    -        this.topoConf = topoConf;
             this.serializer = new KryoTupleSerializer(topoConf, workerData.getWorkerTopologyContext());
    -        this.cachedEmit = new MutableObject(new ArrayList<>());
             this.isDebug = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);
    +        this.producerBatchSz = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
    +    }
    +
    +    // to be called after all Executor objects in the worker are created and before this object is used
    +    public void initLocalRecvQueues() {
    +        Integer minTaskId = workerData.getLocalReceiveQueues().keySet().stream().min(Integer::compareTo).get();
    +        this.localReceiveQueues = Utils.convertToArray( workerData.getLocalReceiveQueues(), minTaskId);
    +        this.indexingBase = minTaskId;
    +        this.queuesToFlush = new ArrayList<JCQueue>(Collections.nCopies(localReceiveQueues.size(), null) );
         }
     
    -    public void transfer(int task, Tuple tuple) {
    -        AddressedTuple val = new AddressedTuple(task, tuple);
    +    // adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null)
    +    public boolean tryTransfer(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits) {
             if (isDebug) {
    -            LOG.info("TRANSFERRING tuple {}", val);
    +            LOG.info("TRANSFERRING tuple {}", addressedTuple);
    +        }
    +
    +        JCQueue localQueue = getLocalQueue(addressedTuple);
    +        if (localQueue!=null) {
    +            return tryTransferLocal(addressedTuple, localQueue, pendingEmits);
    +        }  else  {
    +            if (remotesBatchSz >= producerBatchSz) {
    +                if ( !workerData.tryFlushRemotes() ) {
    +                    if (pendingEmits != null) {
    +                        pendingEmits.add(addressedTuple);
    +                    }
    +                    return false;
    +                }
    +                remotesBatchSz = 0;
    --- End diff --
    
    @revans2 
    There is one instance of the ExecutorTransfer object per Executor. So the only way to invoke this concurrently is as follows:
    - background threads are spun up by the spout/bolt executor and 
    - the outputcollector.emit() was called without any external synchronization.
    
    is that the situation you were thinking ?



---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159963018
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/TupleInfo.java ---
    @@ -23,7 +23,7 @@
     import java.io.Serializable;
     import java.util.List;
     
    -public class TupleInfo implements Serializable {
    +public final class TupleInfo implements Serializable {
    --- End diff --
    
    Why do we want this to be final?  Just curious.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167330523
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License
    + */
    +
    +package org.apache.storm.messaging.netty;
    +
    +import org.apache.storm.serialization.KryoValuesDeserializer;
    +import org.apache.storm.serialization.KryoValuesSerializer;
    +import org.jboss.netty.buffer.ChannelBuffer;
    +import org.jboss.netty.buffer.ChannelBuffers;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
    +public class BackPressureStatus implements java.io.Serializable {
    --- End diff --
    
    Why does this need to be serializable if we are explicitly using kryo for all of the serialization?  Can we just register the class with kryo instead so this will also work when java serialization is disabled?


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r161359459
  
    --- Diff: storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java ---
    @@ -20,6 +20,7 @@
     import org.apache.storm.Config;
     import org.apache.storm.generated.ComponentCommon;
     import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.messaging.netty.BackPressureStatus;
    --- End diff --
    
    fixed.


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    Hi, thanks for the great patch.
    
    But I ran into some problems.
    
    I ran
    ```
    bin/storm jar storm-loadgen-*.jar org.apache.storm.loadgen.ThroughputVsLatency --spouts 1 --splitters 2 --counters 1 -c topology.debug=true
    ```
    on ResourceAwareScheduler and it's not working properly. 
    
    It looks like the __acker-executor was not able to receive messages from spouts and bolts. And spouts and bolts continued to retry sending messages to acker. It then led to another problem:
    https://issues.apache.org/jira/browse/STORM-2970
    
    I tried to run on the storm right before this merge and it works properly. I then tried to run it on the storm right after this merge and this issue appears.
    
    Could you please verify? Thanks!



---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r168033940
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
    @@ -22,19 +22,23 @@
     import org.apache.storm.executor.TupleInfo;
     import org.apache.storm.spout.ISpout;
     import org.apache.storm.spout.ISpoutOutputCollector;
    +import org.apache.storm.tuple.AddressedTuple;
     import org.apache.storm.tuple.MessageId;
     import org.apache.storm.tuple.TupleImpl;
     import org.apache.storm.tuple.Values;
    -import org.apache.storm.utils.Utils;
     import org.apache.storm.utils.MutableLong;
     import org.apache.storm.utils.RotatingMap;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
     import java.util.ArrayList;
     import java.util.List;
     import java.util.Random;
     
    +// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally
    --- End diff --
    
    To nail down and document the concurrent emits semantics I had opened [STORM-2945](https://issues.apache.org/jira/browse/STORM-2945)


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    @roshannaik Thanks for looking into this. I used default configs on secure cluster with ResourceAwareScheduler.


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    **Update:** 
    - The code has been rebased to latest master. 
    - Getting clean test runs on my local machine 
    - Travis runs are indicating these test failures. 
    
    1.  **storm-client:JCQueueTest** -  is failing for some reason here but working ok for me. I will look into it.
    2.  *storm-core* - The exact test or the nature of the failure is not clear from the travis logs. See some exceptions that are talking about MetricsStore ... this might not be related to this PR.
    3.  *org.apache.storm.cassandra.trident.MapStateTest*  Seems to be stalling and causing a failure. 
    
    The cause of the last two failures is unclear to me as they are passing on my local machine and I cant see anything interesting in the Travis logs.
    
    
    @revans2 : Have that open question for you wrt that race condition you brought up wrt ExecutorTransfer::remotesBatchSz.    Rest everything has been addressed.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167286543
  
    --- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
    @@ -890,30 +871,91 @@
         public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines";
     
         /**
    -     * Configure timeout milliseconds used for disruptor queue wait strategy. Can be used to tradeoff latency
    -     * vs. CPU usage
    +     * Selects the Bolt's Wait Strategy to use when there are no incoming msgs. Used to trade off latency vs CPU usage.
    +     */
    +    @isString
    --- End diff --
    
    Can we check if this is an instance of the proper parent interface?


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167282875
  
    --- Diff: docs/Concepts.md ---
    @@ -113,3 +113,8 @@ Topologies execute across one or more worker processes. Each worker process is a
     **Resources:**
     
     * [Config.TOPOLOGY_WORKERS](javadocs/org/apache/storm/Config.html#TOPOLOGY_WORKERS): this config sets the number of workers to allocate for executing the topology
    +
    +### Performance Tuning
    +
    +Refer to [performance tuning guide](docs/CONTRIBUTING.md)
    --- End diff --
    
    I don't think `CONTRIBUTING.md` is the performance tuning guide, also the path is relative, and since `Performance.md` is in the same directory as this we don't need the `docs/` in the link


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167388018
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License
    + */
    +
    +package org.apache.storm.messaging.netty;
    +
    +import org.apache.storm.serialization.KryoValuesDeserializer;
    +import org.apache.storm.serialization.KryoValuesSerializer;
    +import org.jboss.netty.buffer.ChannelBuffer;
    +import org.jboss.netty.buffer.ChannelBuffers;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
    +public class BackPressureStatus implements java.io.Serializable {
    --- End diff --
    
    Hmm interesting. 
    Wondering why ControlMessage, MessageBatch & SaslMessageToken dont follow the same pattern of registering ?
    
    Can you confirm that you are suggesting the following steps ?
    - Remove the inheritance from Serializable interface
    - Register BackPressureStatus.class with kryo
    - Remove the BackPressureStatus.buffer() and BackPressureStatus.read() methods ?


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167800249
  
    --- Diff: storm-client/src/jvm/org/apache/storm/trident/spout/RichSpoutBatchExecutor.java ---
    @@ -194,7 +194,12 @@ public void reportError(Throwable t) {
             public void emitDirect(int task, String stream, List<Object> values, Object id) {
                 throw new UnsupportedOperationException("Trident does not support direct streams");
             }
    -        
    +
    +        @Override
    +        public void flush() {
    +            //NOOP   //TODO: Roshan: validate if this is OK
    --- End diff --
    
    needs to flush.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r161358387
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/metrics/SpoutThrottlingMetrics.java ---
    @@ -22,24 +22,25 @@
     
     public class SpoutThrottlingMetrics extends BuiltinMetrics {
         private final CountMetric skippedMaxSpoutMs = new CountMetric();
    -    private final CountMetric skippedThrottleMs = new CountMetric();
         private final CountMetric skippedInactiveMs = new CountMetric();
    +    private final CountMetric skippedBackPressureMs = new CountMetric();
     
         public SpoutThrottlingMetrics() {
             metricMap.put("skipped-max-spout-ms", skippedMaxSpoutMs);
    -        metricMap.put("skipped-throttle-ms", skippedThrottleMs);
             metricMap.put("skipped-inactive-ms", skippedInactiveMs);
    +        metricMap.put("skipped-backpressure-ms", skippedBackPressureMs);
    --- End diff --
    
    fixed


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r164971328
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java ---
    @@ -89,63 +107,80 @@ public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, L
                             }
                         }
                     }
    +                msgId = MessageId.makeId(anchorsToIds);
    +            } else {
    +                msgId = MessageId.makeUnanchored();
                 }
    -            MessageId msgId = MessageId.makeId(anchorsToIds);
    -            TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, taskId, streamId, msgId);
    -            executor.getExecutorTransfer().transfer(t, tupleExt);
    +            TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), taskId, streamId, msgId);
    +            xsfer.tryTransfer(new AddressedTuple(t, tupleExt), executor.getPendingEmits());
             }
             if (isEventLoggers) {
    -            executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), null, random);
    +            task.sendToEventLogger(executor, values, executor.getComponentId(), null, random, executor.getPendingEmits());
             }
             return outTasks;
         }
     
         @Override
         public void ack(Tuple input) {
    +        if(!ackingEnabled)
    +            return;
             long ackValue = ((TupleImpl) input).getAckVal();
             Map<Long, Long> anchorsToIds = input.getMessageId().getAnchorsToIds();
             for (Map.Entry<Long, Long> entry : anchorsToIds.entrySet()) {
    -            executor.sendUnanchored(taskData, Acker.ACKER_ACK_STREAM_ID,
    +            task.sendUnanchored(Acker.ACKER_ACK_STREAM_ID,
                         new Values(entry.getKey(), Utils.bitXor(entry.getValue(), ackValue)),
    -                    executor.getExecutorTransfer());
    +                    executor.getExecutorTransfer(), executor.getPendingEmits());
             }
             long delta = tupleTimeDelta((TupleImpl) input);
             if (isDebug) {
                 LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
             }
    -        BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
    -        boltAckInfo.applyOn(taskData.getUserContext());
    +
    +        if ( !task.getUserContext().getHooks().isEmpty() ) {
    +            BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
    +            boltAckInfo.applyOn(task.getUserContext());
    +        }
             if (delta >= 0) {
    -            ((BoltExecutorStats) executor.getStats()).boltAckedTuple(
    -                    input.getSourceComponent(), input.getSourceStreamId(), delta);
    +            executor.getStats().boltAckedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta);
             }
         }
     
         @Override
         public void fail(Tuple input) {
    +        if(!ackingEnabled)
    +            return;
             Set<Long> roots = input.getMessageId().getAnchors();
             for (Long root : roots) {
    -            executor.sendUnanchored(taskData, Acker.ACKER_FAIL_STREAM_ID,
    -                    new Values(root), executor.getExecutorTransfer());
    +            task.sendUnanchored(Acker.ACKER_FAIL_STREAM_ID,
    +                    new Values(root), executor.getExecutorTransfer(), executor.getPendingEmits());
             }
             long delta = tupleTimeDelta((TupleImpl) input);
             if (isDebug) {
                 LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
             }
             BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
    -        boltFailInfo.applyOn(taskData.getUserContext());
    -        if (delta >= 0) {
    -            ((BoltExecutorStats) executor.getStats()).boltFailedTuple(
    -                    input.getSourceComponent(), input.getSourceStreamId(), delta);
    +        boltFailInfo.applyOn(task.getUserContext());
    +        if (delta != 0) {
    --- End diff --
    
    Looks like missed spot : this should be `delta >= 0`.
    
    https://github.com/apache/storm/pull/2241/files?diff=split#r158213916
    
    ```
     (when (<= 0 delta) 
       (stats/bolt-failed-tuple! executor-stats 
                                 (.getSourceComponent tuple) 
                                 (.getSourceStreamId tuple) 
                                 delta)))) 
    ```


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    @roshannaik 
    Hmm... OK. Interesting result. Since you're planning to refresh numbers in doc before Storm 2.0.0, so we can revisit the number to see whether there's odd result shown or not.
    
    FYI my desktop spec and OS here:
    
    OS: Ubuntu 17.10, 4.13.0-32-generic #35-Ubuntu SMP Thu Jan 25 09:13:46 UTC 2018 x86_64 x86_64 x86_64
    CPU: AMD Ryzen 5 1600 3.2Ghz 6 core (with hyper-thread = 12 logical cores)
    RAM: Samsung DDR4 32G 19200
    SSD: Samsung 850 Evo



---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    @Ethanlm  can you please open a jira for this issue ? 


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r159949952
  
    --- Diff: docs/Performance.md ---
    @@ -0,0 +1,132 @@
    +---
    --- End diff --
    
    Great Documentation, but can we have some of the other docs link to it?


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167325584
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License
    + */
    +
    +package org.apache.storm.daemon.worker;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.messaging.TaskMessage;
    +import org.apache.storm.policy.IWaitStrategy;
    +import org.apache.storm.serialization.ITupleSerializer;
    +import org.apache.storm.tuple.AddressedTuple;
    +import org.apache.storm.utils.JCQueue;
    +import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.TransferDrainer;
    +import org.apache.storm.utils.Utils;
    +import org.apache.storm.utils.Utils.SmartThread;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +import java.util.Queue;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.locks.ReentrantReadWriteLock;
    +
    +// Transfers messages destined to other workers
    +class WorkerTransfer implements JCQueue.Consumer {
    +    static final Logger LOG = LoggerFactory.getLogger(WorkerTransfer.class);
    +
    +    private final TransferDrainer drainer;
    +    private WorkerState workerState;
    +    private IWaitStrategy backPressureWaitStrategy;
    +
    +    JCQueue transferQueue; // [remoteTaskId] -> JCQueue. Some entries maybe null (if no emits to those tasksIds from this worker)
    --- End diff --
    
    nit why does this need to be package private?  Can we lock it down more?


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r168037754
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
    @@ -22,19 +22,23 @@
     import org.apache.storm.executor.TupleInfo;
     import org.apache.storm.spout.ISpout;
     import org.apache.storm.spout.ISpoutOutputCollector;
    +import org.apache.storm.tuple.AddressedTuple;
     import org.apache.storm.tuple.MessageId;
     import org.apache.storm.tuple.TupleImpl;
     import org.apache.storm.tuple.Values;
    -import org.apache.storm.utils.Utils;
     import org.apache.storm.utils.MutableLong;
     import org.apache.storm.utils.RotatingMap;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
     
     import java.util.ArrayList;
     import java.util.List;
     import java.util.Random;
     
    +// Methods are not thread safe. Each thread expected to have a separate instance, or else synchronize externally
    --- End diff --
    
    It is to figure out what will have for Storm 2.0... since we cannot make any breaking changes even if we like to thereafter until 3.0.  


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167387629
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License
    + */
    +
    +package org.apache.storm.messaging.netty;
    +
    +import org.apache.storm.serialization.KryoValuesDeserializer;
    +import org.apache.storm.serialization.KryoValuesSerializer;
    +import org.jboss.netty.buffer.ChannelBuffer;
    +import org.jboss.netty.buffer.ChannelBuffers;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
    +public class BackPressureStatus implements java.io.Serializable {
    --- End diff --
    
    You should be able to get away with just registering the class and not providing a serializer.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167807269
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
    @@ -44,14 +48,15 @@
         private final Boolean isEventLoggers;
         private final Boolean isDebug;
         private final RotatingMap<Long, TupleInfo> pending;
    +    private TupleInfo globalTupleInfo = new TupleInfo();  // thread safety: assumes Collector.emit*() calls are externally synchronized (if needed).
    --- End diff --
    
    To get the id of the current thread  involves a call to Thread.currentThread() which is quite [expensive](!http://www.jutils.com/checks/performance.html)... so not good to use to determine whether or not to use fast path.
    
    I am introducing that check if topology.debug is enabled as a compromise. This mode could 
     be used mode to do any checks in dev mode that are unnecessary or expensive to do repeatedly in production.
    
    I have opened: [STORM-2945](!https://issues.apache.org/jira/browse/STORM-2945) to nail down and document background emits support.. we can document both spout & bolt support semantics together in the same document.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167625247
  
    --- Diff: storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---
    @@ -40,109 +40,117 @@
     import org.apache.storm.nimbus.NimbusInfo;
     
     public interface IStormClusterState {
    -    public List<String> assignments(Runnable callback);
    +    List<String> assignments(Runnable callback);
     
    -    public Assignment assignmentInfo(String stormId, Runnable callback);
    +    Assignment assignmentInfo(String stormId, Runnable callback);
     
    -    public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
    +    VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
     
    -    public Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
    +    Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
     
    -    public List<String> blobstoreInfo(String blobKey);
    +    List<String> blobstoreInfo(String blobKey);
     
    -    public List<NimbusSummary> nimbuses();
    +    List<NimbusSummary> nimbuses();
     
    -    public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
    +    void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
     
    -    public List<String> activeStorms();
    +    List<String> activeStorms();
     
         /**
          * Get a storm base for a topology
          * @param stormId the id of the topology
          * @param callback something to call if the data changes (best effort)
          * @return the StormBase or null if it is not alive.
          */
    -    public StormBase stormBase(String stormId, Runnable callback);
    +    StormBase stormBase(String stormId, Runnable callback);
     
    -    public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
    +    ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
     
    -    public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
    +    List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
     
    -    public List<ProfileRequest> getTopologyProfileRequests(String stormId);
    +    List<ProfileRequest> getTopologyProfileRequests(String stormId);
     
    -    public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);
    +    void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);
     
    -    public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
    +    void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
     
    -    public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
    +    Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
     
    -    public List<String> supervisors(Runnable callback);
    +    List<String> supervisors(Runnable callback);
     
    -    public SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
    +    SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
     
    -    public void setupHeatbeats(String stormId);
    +    void setupHeatbeats(String stormId);
     
    -    public void teardownHeartbeats(String stormId);
    +    void teardownHeartbeats(String stormId);
     
    -    public void teardownTopologyErrors(String stormId);
    +    void teardownTopologyErrors(String stormId);
     
    -    public List<String> heartbeatStorms();
    +    List<String> heartbeatStorms();
     
    -    public List<String> errorTopologies();
    +    List<String> errorTopologies();
     
    -    public List<String> backpressureTopologies();
    +    /** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon. */
    --- End diff --
    
    done. 3.0.0


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2502


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    Yes I checked it out wrong...  My bad.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r168013771
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
    @@ -28,19 +28,21 @@
     import java.util.concurrent.atomic.AtomicLong;
     
     // Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
    -public class BackPressureStatus implements java.io.Serializable {
    -    static final long serialVersionUID = 1L;
    +public class BackPressureStatus {
         public static final short IDENTIFIER = (short)-600;
         private static final int SIZE_OF_ID = 2; // size if IDENTIFIER
         private static final int SIZE_OF_INT = 4;
     
         private static AtomicLong bpCount = new AtomicLong(0);
     
    -    public final String workerId;
    +    public String workerId;
         public final long id;                       // monotonically increasing id
    --- End diff --
    
    Just wondering: the characteristic of `monotonically increasing` guarantee `id` to be unique in a worker, but not among workers, and also reset to 0 after worker crash and restart. Does it hurt the backpressure logic at any chance?


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167286165
  
    --- Diff: pom.xml ---
    @@ -259,6 +259,7 @@
             <snakeyaml.version>1.11</snakeyaml.version>
             <httpclient.version>4.3.3</httpclient.version>
             <clojure.tools.cli.version>0.2.4</clojure.tools.cli.version>
    +        <jctools.version>2.0.1</jctools.version>
    --- End diff --
    
    Why does flux need jctools?  Shouldn't it come with storm-client?


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167381302
  
    --- Diff: storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---
    @@ -40,109 +40,117 @@
     import org.apache.storm.nimbus.NimbusInfo;
     
     public interface IStormClusterState {
    -    public List<String> assignments(Runnable callback);
    +    List<String> assignments(Runnable callback);
     
    -    public Assignment assignmentInfo(String stormId, Runnable callback);
    +    Assignment assignmentInfo(String stormId, Runnable callback);
     
    -    public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
    +    VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
     
    -    public Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
    +    Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
     
    -    public List<String> blobstoreInfo(String blobKey);
    +    List<String> blobstoreInfo(String blobKey);
     
    -    public List<NimbusSummary> nimbuses();
    +    List<NimbusSummary> nimbuses();
     
    -    public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
    +    void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
     
    -    public List<String> activeStorms();
    +    List<String> activeStorms();
     
         /**
          * Get a storm base for a topology
          * @param stormId the id of the topology
          * @param callback something to call if the data changes (best effort)
          * @return the StormBase or null if it is not alive.
          */
    -    public StormBase stormBase(String stormId, Runnable callback);
    +    StormBase stormBase(String stormId, Runnable callback);
     
    -    public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
    +    ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
     
    -    public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
    +    List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
     
    -    public List<ProfileRequest> getTopologyProfileRequests(String stormId);
    +    List<ProfileRequest> getTopologyProfileRequests(String stormId);
     
    -    public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);
    +    void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);
     
    -    public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
    +    void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
     
    -    public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
    +    Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
     
    -    public List<String> supervisors(Runnable callback);
    +    List<String> supervisors(Runnable callback);
     
    -    public SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
    +    SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
     
    -    public void setupHeatbeats(String stormId);
    +    void setupHeatbeats(String stormId);
     
    -    public void teardownHeartbeats(String stormId);
    +    void teardownHeartbeats(String stormId);
     
    -    public void teardownTopologyErrors(String stormId);
    +    void teardownTopologyErrors(String stormId);
     
    -    public List<String> heartbeatStorms();
    +    List<String> heartbeatStorms();
     
    -    public List<String> errorTopologies();
    +    List<String> errorTopologies();
     
    -    public List<String> backpressureTopologies();
    +    /** @deprecated: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon. */
    --- End diff --
    
    Filed STORM-2944. Can you please add a 3.0 as a version to jira so that this jira can be associated with it ?


---

[GitHub] storm issue #2502: new PR for STORM-2306 : Messaging subsystem redesign

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2502
  
    This looks really really good.  I ran through the unit tests and some performance tests and everything looks great.  Now I feel like I can get excited about this going in.  I am going to spend tomorrow going through the code, but I am really hopeful that we can get this merged in next week.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167287026
  
    --- Diff: storm-client/src/jvm/org/apache/storm/StormTimer.java ---
    @@ -97,6 +97,8 @@ public void run() {
                             // events.
                             Time.sleep(1000);
                         }
    +                    if(Thread.interrupted())
    +                        this.active.set(false);
    --- End diff --
    
    Nit can we wrap this in `'{'` and `'}'` and have a space after the `if` to match the style guide.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167387808
  
    --- Diff: storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java ---
    @@ -44,14 +48,15 @@
         private final Boolean isEventLoggers;
         private final Boolean isDebug;
         private final RotatingMap<Long, TupleInfo> pending;
    +    private TupleInfo globalTupleInfo = new TupleInfo();  // thread safety: assumes Collector.emit*() calls are externally synchronized (if needed).
    --- End diff --
    
    Can we do a sanity check for the fast path + documentation?  Check if the thread id is the same as the id of the main thread we expect emits to come from.  If so we go with the fast path, if not we have a thread local or do some kind of locking + documentation about why you never want the spout to emit from a background thread.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167814459
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/BackPressureStatus.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License
    + */
    +
    +package org.apache.storm.messaging.netty;
    +
    +import org.apache.storm.serialization.KryoValuesDeserializer;
    +import org.apache.storm.serialization.KryoValuesSerializer;
    +import org.jboss.netty.buffer.ChannelBuffer;
    +import org.jboss.netty.buffer.ChannelBuffers;
    +
    +import java.io.IOException;
    +import java.util.Collection;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +// Instances of this type are sent from NettyWorker to upstream WorkerTransfer to indicate BackPressure situation
    +public class BackPressureStatus implements java.io.Serializable {
    --- End diff --
    
    Thanks for the detailed comment. Made the changes and tested them as well.


---

[GitHub] storm pull request #2502: new PR for STORM-2306 : Messaging subsystem redesi...

Posted by roshannaik <gi...@git.apache.org>.
Github user roshannaik commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r160011014
  
    --- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java ---
    @@ -177,6 +195,35 @@ public BuiltinMetrics getBuiltInMetrics() {
             return builtInMetrics;
         }
     
    +
    +    // Non Blocking call. If cannot emit to destination immediately, such tuples will be added to `pendingEmits` argument
    +    public void sendUnanchored(String stream, List<Object> values, ExecutorTransfer transfer, Queue<AddressedTuple> pendingEmits) {
    +        Tuple tuple = getTuple(stream, values);
    +        List<Integer> tasks = getOutgoingTasks(stream, values);
    +        for (Integer t : tasks) {
    +            AddressedTuple addressedTuple = new AddressedTuple(t, tuple);
    +            transfer.tryTransfer(addressedTuple, pendingEmits);
    +        }
    +    }
    +
    +    /**
    +     * Send sampled data to the eventlogger if the global or component level debug flag is set (via nimbus api).
    +     */
    +    public void sendToEventLogger(Executor executor, List values,
    --- End diff --
    
    @HeartSaVioR you pointed out some optimizations are possible to this .. that we can tackle in another jira ... can you elaborate or capture your thoughts into a jira ?


---