You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "Jungtaek Lim (JIRA)" <ji...@apache.org> on 2015/05/04 08:40:06 UTC

[jira] [Commented] (STORM-794) Trident Topology with some situation seems not handle deactivate during graceful shutdown

    [ https://issues.apache.org/jira/browse/STORM-794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14526316#comment-14526316 ] 

Jungtaek Lim commented on STORM-794:
------------------------------------

I found the issue.

I modified executor.clj to add logs representing size of overflow-buffer, pending, max-spout-pending, and active flag.

{code}
(let [active? @(:storm-active-atom executor-data)
                curr-count (.get emitted-count)]
            (do
              (log-message "overflow-buffer's size " (.size overflow-buffer) " and pending size " (.size pending)
                " and max-spout-pending " max-spout-pending " and active " active?)

              (if (and (.isEmpty overflow-buffer)
                   (or (not max-spout-pending)
                       (< (.size pending) max-spout-pending)))
              (if active?
                (do
                  (when-not @last-active
                    (reset! last-active true)
                    (log-message "Activating spout " component-id ":" (keys task-datas))
                    (fast-list-iter [^ISpout spout spouts] (.activate spout)))

                  (fast-list-iter [^ISpout spout spouts]
                    (do
                      (log-message "pending size " (.size pending) " and max-spout-pending " max-spout-pending
                        " and active " active?)
                      (.nextTuple spout))))
                (do
                  (when @last-active
                    (reset! last-active false)
                    (log-message "Deactivating spout " component-id ":" (keys task-datas))
                    (fast-list-iter [^ISpout spout spouts] (.deactivate spout)))
                  ;; TODO: log that it's getting throttled
                  (Time/sleep 100))))
              )
            (if (and (= curr-count (.get emitted-count)) active?)
              (do (.increment empty-emit-streak)
                  (.emptyEmit spout-wait-strategy (.get empty-emit-streak)))
              (.set empty-emit-streak 0)
              )) 
{code}

And below is logs. Please take a look at tx 41696:0.

{noformat}
2015-05-04 14:02:37.198 INFO  [Thread-16-$mastercoord-bg0][executor] Acking message 41696:0
2015-05-04 14:02:37.198 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: $mastercoord-bg0 $commit [41696:0]
2015-05-04 14:02:37.198 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: $mastercoord-bg0 __ack_init [-6284616655193060224 0 1]
2015-05-04 14:02:37.199 INFO  [Thread-16-$mastercoord-bg0][executor] overflow-buffer's size 0 and pending size 10 and max-spout-pending 10 and active true
2015-05-04 14:02:37.199 INFO  [Thread-4-__acker][executor] Processing received message source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-6284616655193060224 0 1]
2015-05-04 14:02:37.199 INFO  [Thread-4-__acker][task] Emitting direct: 1; __acker __ack_ack [-6284616655193060224]
2015-05-04 14:02:37.200 INFO  [Thread-16-$mastercoord-bg0][executor] Processing received message source: __acker:3, stream: __ack_ack, id: {}, [-6284616655193060224]
2015-05-04 14:02:37.200 INFO  [Thread-16-$mastercoord-bg0][executor] Acking message 41696:0
2015-05-04 14:02:37.200 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: $mastercoord-bg0 $success [41696:0]
2015-05-04 14:02:37.200 INFO  [Thread-2-$spoutcoord-spout0][executor] Processing received message source: $mastercoord-bg0:1, stream: $success, id: {}, [41696:0]
2015-05-04 14:02:37.203 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: $mastercoord-bg0 $batch [41706:0]
2015-05-04 14:02:37.204 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: $mastercoord-bg0 __ack_init [8327784611027372914 600406483975750023 1]
2015-05-04 14:02:37.204 INFO  [Thread-2-$spoutcoord-spout0][executor] Processing received message source: $mastercoord-bg0:1, stream: $batch, id: {8327784611027372914=600406483975750023}, [41706:0]
2015-05-04 14:02:37.204 INFO  [Thread-4-__acker][executor] Processing received message source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [8327784611027372914 600406483975750023 1]
2015-05-04 14:02:37.204 INFO  [Thread-16-$mastercoord-bg0][executor] overflow-buffer's size 0 and pending size 10 and max-spout-pending 10 and active true
{noformat}

When MasterBatchCoordinator.ack() has called and its attempt state is COMMITTING, MasterBatchCoordinator treats current tx to be completed, AND it starts new transaction immediately by increases current tx id and calls sync() directly.
You can find that mastercoord-bg0 emits 41696:0 to $success and starts 41706 (note that max spout pending is 10). Async loop comes too late.

So, though executor can know that active is false, above if-statement could be always false so executor is never deactivated.

ps. My spout can sleep more than 1 sec before emitting from emitBatch(), but I don't think it shouldn't be issue. 
Btw, complete latency of batch is about 20 secs. At this time Spout doesn't emit anything from emitBatch(). 
{noformat}
2015-05-04 14:02:17.173 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: $mastercoord-bg0 $batch [41696:0]
2015-05-04 14:02:37.198 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: $mastercoord-bg0 $commit [41696:0]
2015-05-04 14:02:37.200 INFO  [Thread-16-$mastercoord-bg0][task] Emitting: $mastercoord-bg0 $success [41696:0]
{noformat}

> Trident Topology with some situation seems not handle deactivate during graceful shutdown
> -----------------------------------------------------------------------------------------
>
>                 Key: STORM-794
>                 URL: https://issues.apache.org/jira/browse/STORM-794
>             Project: Apache Storm
>          Issue Type: Bug
>    Affects Versions: 0.9.3
>            Reporter: Jungtaek Lim
>
> I met an issue from Trident Topology in production env.
> Normally, when we kill a topology via UI, Nimbus changes Topology status to "killed", and when Spout determines new status, it becomes deactivated so bolts can handle remain tuples within wait-time.
> AFAIK that's how Storm guarantees graceful shutdown.
> But, Trident Topology seems not handle "deactivate" while we try shutdown topology gracefully.
> MasterBatchCoordinator never stops making next transaction, so Trident Spout never stops emitting, bolts (function) always take care of tuples.
> Topology setting
> - 1 worker, 1 acker
> - max spout pending: 1
> - TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS : 5
> -- It may be weird but MasterBatchCoordinator's default value is 1
> * Nimbus log
> {code}
> 2015-04-20 09:59:07.954 INFO  [pool-5-thread-41][nimbus] Delaying event :remove for 120 secs for BFDC-topology-DynamicCollect-68c9d7b4-72-1429491015
> ...
> 2015-04-20 09:59:07.955 INFO  [pool-5-thread-41][nimbus] Updated BFDC-topology-DynamicCollect-68c9d7b4-72-1429491015 with status {:type :killed, :kill-time-secs 120}
> ...
> 2015-04-20 10:01:07.956 INFO  [timer][nimbus] Killing topology: BFDC-topology-DynamicCollect-68c9d7b4-72-1429491015
> ...
> 2015-04-20 10:01:14.448 INFO  [timer][nimbus] Cleaning up BFDC-topology-DynamicCollect-68c9d7b4-72-1429491015
> {code}
> * Supervisor log
> {code}
> 2015-04-20 10:01:07.960 INFO  [Thread-1][supervisor] Removing code for storm id BFDC-topology-DynamicCollect-68c9d7b4-72-1429491015
> 2015-04-20 10:01:07.962 INFO  [Thread-2][supervisor] Shutting down and clearing state for id 9719259e-528c-4336-abf9-592c1bb9a00b. Current supervisor time: 1429491667. State: :disallowed, Heartbeat: #backtype.storm.daemon.common.WorkerHeartbeat{:time-secs 1429491667, :storm-id "BFDC-topology-DynamicCollect-68c9d7b4-72-1429491015", :executors #{[2 2] [3 3] [4 4] [5 5] [6 6] [7 7] [8 8] [9 9] [10 10] [11 11] [12 12] [13 13] [14 14] [-1 -1] [1 1]}, :port 6706}
> 2015-04-20 10:01:07.962 INFO  [Thread-2][supervisor] Shutting down 5bc084a2-b668-4610-86f6-9b93304d40a8:9719259e-528c-4336-abf9-592c1bb9a00b
> 2015-04-20 10:01:08.974 INFO  [Thread-2][supervisor] Shut down 5bc084a2-b668-4610-86f6-9b93304d40a8:9719259e-528c-4336-abf9-592c1bb9a00b
> {code}
> * Worker log
> {code}
> 2015-04-20 10:01:07.985 INFO  [Thread-33][worker] Shutting down worker BFDC-topology-DynamicCollect-68c9d7b4-72-1429491015 5bc084a2-b668-4610-86f6-9b93304d40a8 6706
> 2015-04-20 10:01:07.985 INFO  [Thread-33][worker] Shutting down receive thread
> 2015-04-20 10:01:07.988 WARN  [Thread-33][ExponentialBackoffRetry] maxRetries too large (300). Pinning to 29
> 2015-04-20 10:01:07.988 INFO  [Thread-33][StormBoundedExponentialBackoffRetry] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries [300]
> 2015-04-20 10:01:07.988 INFO  [Thread-33][Client] New Netty Client, connect to localhost, 6706, config: , buffer_size: 5242880
> 2015-04-20 10:01:07.991 INFO  [client-schedule-service-1][Client] Reconnect started for Netty-Client-localhost/127.0.0.1:6706... [0]
> 2015-04-20 10:01:07.996 INFO  [Thread-33][loader] Shutting down receiving-thread: [BFDC-topology-DynamicCollect-68c9d7b4-72-1429491015, 6706]
> ...
> 2015-04-20 10:01:08.044 INFO  [Thread-33][Client] Closing Netty Client Netty-Client-localhost/127.0.0.1:6706
> 2015-04-20 10:01:08.044 INFO  [Thread-33][Client] Waiting for pending batchs to be sent with Netty-Client-localhost/127.0.0.1:6706..., timeout: 600000ms, pendings: 1
> {code}
> I found activating log, but cannot find deactivating log.
> {code}
> 2015-04-20 09:50:24.556 INFO  [Thread-30-$mastercoord-bg0][executor] Activating spout $mastercoord-bg0:(1)
> {code}
> Please note that it doesn't work when I just push button to "deactivate" topology via UI.
> We're changing our Topology to normal Spout-Bolt, but personally I'd like to see it resolved. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)