You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by Abhishek Agarwal <ab...@gmail.com> on 2015/09/02 12:56:51 UTC

Overflow buffer and metrics

I have been facing a scenario recently wherein the topology is just hung.
In stack trace, I can see that KafkaSpout is stuck on metrics_tick.invoke.
Since it is not able to find free slot in the disruptor queue. Acker taks
is blocked on disruptor queue as well. I noticed that Nathan had a fixed a
deadlock scenario by introducing an overflow buffer. But this overflow
buffer is not used when metrics are emitted. Could that not result in the
same deadlock?
Here is the stack of KafkaSpout

"Thread-109-KafkaSpout" #145 prio=5 os_prio=0 tid=0x00007f7f20aae800
nid=0x3c81 runnable [0x00007f7cceae9000]

   java.lang.Thread.State: TIMED_WAITING (parking)

        at sun.misc.Unsafe.park(Native Method)

        at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:338)

        at
com.lmax.disruptor.SingleThreadedClaimStrategy.waitForFreeSlotAt(SingleThreadedClaimStrategy.java:129)

        at
com.lmax.disruptor.SingleThreadedClaimStrategy.incrementAndGet(SingleThreadedClaimStrategy.java:81)

        at com.lmax.disruptor.Sequencer.next(Sequencer.java:127)

        at
backtype.storm.utils.DisruptorQueue.publishDirect(DisruptorQueue.java:174)

        at
backtype.storm.utils.DisruptorQueue.publish(DisruptorQueue.java:167)

        at backtype.storm.disruptor$publish.invoke(disruptor.clj:66)

        at
backtype.storm.daemon.executor$mk_executor_transfer_fn$this__6431.invoke(executor.clj:190)

        at
backtype.storm.daemon.executor$mk_executor_transfer_fn$this__6431.invoke(executor.clj:197)

        at backtype.storm.daemon.task$send_unanchored.invoke(task.clj:112)

        at backtype.storm.daemon.task$send_unanchored.invoke(task.clj:117)

        at
backtype.storm.daemon.executor$metrics_tick.invoke(executor.clj:296)

        at
backtype.storm.daemon.executor$fn__6579$tuple_action_fn__6585.invoke(executor.clj:438)

        at
backtype.storm.daemon.executor$mk_task_receiver$fn__6570.invoke(executor.clj:404)

        at
backtype.storm.disruptor$clojure_handler$reify__1605.onEvent(disruptor.clj:58)

        at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)

        at
backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87)

        at backtype.storm.disruptor$consume_batch.invoke(disruptor.clj:76)

        at
backtype.storm.daemon.executor$fn__6579$fn__6594$fn__6623.invoke(executor.clj:542)

        at backtype.storm.util$async_loop$fn__459.invoke(util.clj:463)

        at clojure.lang.AFn.run(AFn.java:24)

        at java.lang.Thread.run(Thread.java:745)

-- 
Regards,
Abhishek Agarwal

Re: Overflow buffer and metrics

Posted by Abhishek Agarwal <ab...@gmail.com>.
Guys,
Any thoughts here? I can see that emitting metric is a blocking call in
spout and in my stack trace, this is where the spout is stuck. Shouldn't we
use the overflow buffer for emitting metrics as well?

On Wed, Sep 2, 2015 at 4:26 PM, Abhishek Agarwal <ab...@gmail.com>
wrote:

> I have been facing a scenario recently wherein the topology is just hung.
> In stack trace, I can see that KafkaSpout is stuck on metrics_tick.invoke.
> Since it is not able to find free slot in the disruptor queue. Acker taks
> is blocked on disruptor queue as well. I noticed that Nathan had a fixed a
> deadlock scenario by introducing an overflow buffer. But this overflow
> buffer is not used when metrics are emitted. Could that not result in the
> same deadlock?
> Here is the stack of KafkaSpout
>
> "Thread-109-KafkaSpout" #145 prio=5 os_prio=0 tid=0x00007f7f20aae800
> nid=0x3c81 runnable [0x00007f7cceae9000]
>
>    java.lang.Thread.State: TIMED_WAITING (parking)
>
>         at sun.misc.Unsafe.park(Native Method)
>
>         at
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:338)
>
>         at
> com.lmax.disruptor.SingleThreadedClaimStrategy.waitForFreeSlotAt(SingleThreadedClaimStrategy.java:129)
>
>         at
> com.lmax.disruptor.SingleThreadedClaimStrategy.incrementAndGet(SingleThreadedClaimStrategy.java:81)
>
>         at com.lmax.disruptor.Sequencer.next(Sequencer.java:127)
>
>         at
> backtype.storm.utils.DisruptorQueue.publishDirect(DisruptorQueue.java:174)
>
>         at
> backtype.storm.utils.DisruptorQueue.publish(DisruptorQueue.java:167)
>
>         at backtype.storm.disruptor$publish.invoke(disruptor.clj:66)
>
>         at
> backtype.storm.daemon.executor$mk_executor_transfer_fn$this__6431.invoke(executor.clj:190)
>
>         at
> backtype.storm.daemon.executor$mk_executor_transfer_fn$this__6431.invoke(executor.clj:197)
>
>         at backtype.storm.daemon.task$send_unanchored.invoke(task.clj:112)
>
>         at backtype.storm.daemon.task$send_unanchored.invoke(task.clj:117)
>
>         at
> backtype.storm.daemon.executor$metrics_tick.invoke(executor.clj:296)
>
>         at
> backtype.storm.daemon.executor$fn__6579$tuple_action_fn__6585.invoke(executor.clj:438)
>
>         at
> backtype.storm.daemon.executor$mk_task_receiver$fn__6570.invoke(executor.clj:404)
>
>         at
> backtype.storm.disruptor$clojure_handler$reify__1605.onEvent(disruptor.clj:58)
>
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87)
>
>         at backtype.storm.disruptor$consume_batch.invoke(disruptor.clj:76)
>
>         at
> backtype.storm.daemon.executor$fn__6579$fn__6594$fn__6623.invoke(executor.clj:542)
>
>         at backtype.storm.util$async_loop$fn__459.invoke(util.clj:463)
>
>         at clojure.lang.AFn.run(AFn.java:24)
>
>         at java.lang.Thread.run(Thread.java:745)
>
> --
> Regards,
> Abhishek Agarwal
>
>


-- 
Regards,
Abhishek Agarwal