You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by 鞠大升 <da...@gmail.com> on 2014/03/10 08:52:27 UTC

Topology is hang when bolt "Async loop died" because KryoTupleSerializer.serialize throws NullPointerException

hi, all

*Background:*
---------------------------------------------------------------------------------------------------------------------
we are using Storm 0.9.0.1, our topology has KafkaSpout(read logs from
kafka),  ParserBolt(paser log), SaverBolt(save to kafka again). KafkaSpout
have 16 threads, ParserBolt have 32 threads,  SaverBolt have 16 threads.
The ParserBolt is written in python using Multilang.

*Problems:*
---------------------------------------------------------------------------------------------------------------------
Sometimes, KryoTupleSerializer.serialize throws NullPointerException cause
the ParserBolt died。Then the supervisor will restart the bolts again, but
the new bolt will never receive any tuples, and the topology is hang until
we restart the topology.

*Analyse:*
---------------------------------------------------------------------------------------------------------------------
We found a TroubleShooting(
https://github.com/nathanmarz/storm/wiki/Troubleshooting#wiki-nullpointerexception-from-deep-inside-storm)
says:  This
is caused by having multiple threads issue methods on the OutputCollector.
All emits, acks, and fails must happen on the same thread. One subtle way
this can happen is if you make a IBasicBolt that emits on a separate thread.
 IBasicBolt's automatically ack after execute is called, so this would
cause multiple threads to use the OutputCollector leading to this
exception. When using a basic bolt, all emits must happen in the same
thread that runs execute.
And we found in ShellBolt.java,the _readerThread is a new thread,
handleEmit will call emit to emit new tuples.

But another wiki(
https://github.com/nathanmarz/storm/wiki/Concepts#wiki-bolts) says:  Its
perfectly fine to launch new threads in bolts that do processing
asynchronously.
OutputCollector<http://nathanmarz.github.com/storm/doc/backtype/storm/task/OutputCollector.html>
is
thread-safe and can be called at any time.

*So we have questions:*
---------------------------------------------------------------------------------------------------------------------
1) does OutputCollector is thread-safe or not?  if it is not thread-safe,
then all emits, acks, and fails must happen on the same thread, does
ShellBolt has a bug?
2) when the bolt is restart, why the topology is hang? by the way, we are
using netty.

anyone can help?

the work.log:
---------------------------------------------------------------------------------------------------------------------

2014-03-10 12:48:31 b.s.util [ERROR] Async loop died!

java.lang.RuntimeException: java.lang.NullPointerException

        at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
~[storm-core-0.9.0.1.jar:na]

        at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
~[storm-core-0.9.0.1.jar:na]

        at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
~[storm-core-0.9.0.1.jar:na]

        at
backtype.storm.disruptor$consume_loop_STAR_$fn__849.invoke(disruptor.clj:74)
~[storm-core-0.9.0.1.jar:na]

        at backtype.storm.util$async_loop$fn__469.invoke(util.clj:406)
~[storm-core-0.9.0.1.jar:na]

        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]

        at java.lang.Thread.run(Thread.java:722) [na:1.7.0_21]

Caused by: java.lang.NullPointerException: null

        at
backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:24)
~[storm-core-0.9.0.1.jar:na]

        at
backtype.storm.daemon.worker$mk_transfer_fn$fn__4335$fn__4339.invoke(worker.clj:108)
~[storm-core-0.9.0.1.jar:na]

        at backtype.storm.util$fast_list_map.invoke(util.clj:804)
~[storm-core-0.9.0.1.jar:na]

        at
backtype.storm.daemon.worker$mk_transfer_fn$fn__4335.invoke(worker.clj:108)
~[storm-core-0.9.0.1.jar:na]

        at
backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__4060.invoke(executor.clj:240)
~[storm-core-0.9.0.1.jar:na]

        at
backtype.storm.disruptor$clojure_handler$reify__836.onEvent(disruptor.clj:43)
~[storm-core-0.9.0.1.jar:na]

        at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
~[storm-core-0.9.0.1.jar:na]

        ... 6 common frames omitted



-- 
dashengju
+86 13810875910
dashengju@gmail.com

Re: Topology is hang when bolt "Async loop died" because KryoTupleSerializer.serialize throws NullPointerException

Posted by Nathan Leung <nc...@gmail.com>.
Try implementing IRichBolt so you have direct control over emits and acks,
and also synchronize access to the OutputCollector so that multiple threads
aren't using it simultaneously.


On Mon, Mar 10, 2014 at 3:52 AM, 鞠大升 <da...@gmail.com> wrote:

> hi, all
>
> *Background:*
>
> ---------------------------------------------------------------------------------------------------------------------
> we are using Storm 0.9.0.1, our topology has KafkaSpout(read logs from
> kafka),  ParserBolt(paser log), SaverBolt(save to kafka again). KafkaSpout
> have 16 threads, ParserBolt have 32 threads,  SaverBolt have 16 threads.
> The ParserBolt is written in python using Multilang.
>
> *Problems:*
>
> ---------------------------------------------------------------------------------------------------------------------
> Sometimes, KryoTupleSerializer.serialize throws NullPointerException cause
> the ParserBolt died。Then the supervisor will restart the bolts again, but
> the new bolt will never receive any tuples, and the topology is hang until
> we restart the topology.
>
> *Analyse:*
>
> ---------------------------------------------------------------------------------------------------------------------
> We found a TroubleShooting(
> https://github.com/nathanmarz/storm/wiki/Troubleshooting#wiki-nullpointerexception-from-deep-inside-storm) says:  This
> is caused by having multiple threads issue methods on the OutputCollector.
> All emits, acks, and fails must happen on the same thread. One subtle way
> this can happen is if you make a IBasicBolt that emits on a separate
> thread. IBasicBolt's automatically ack after execute is called, so this
> would cause multiple threads to use the OutputCollector leading to this
> exception. When using a basic bolt, all emits must happen in the same
> thread that runs execute.
> And we found in ShellBolt.java,the _readerThread is a new thread,
> handleEmit will call emit to emit new tuples.
>
> But another wiki(
> https://github.com/nathanmarz/storm/wiki/Concepts#wiki-bolts) says:  Its
> perfectly fine to launch new threads in bolts that do processing
> asynchronously. OutputCollector<http://nathanmarz.github.com/storm/doc/backtype/storm/task/OutputCollector.html> is
> thread-safe and can be called at any time.
>
> *So we have questions:*
>
> ---------------------------------------------------------------------------------------------------------------------
> 1) does OutputCollector is thread-safe or not?  if it is not thread-safe,
> then all emits, acks, and fails must happen on the same thread, does
> ShellBolt has a bug?
> 2) when the bolt is restart, why the topology is hang? by the way, we are
> using netty.
>
> anyone can help?
>
> the work.log:
>
> ---------------------------------------------------------------------------------------------------------------------
>
> 2014-03-10 12:48:31 b.s.util [ERROR] Async loop died!
>
> java.lang.RuntimeException: java.lang.NullPointerException
>
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.disruptor$consume_loop_STAR_$fn__849.invoke(disruptor.clj:74)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at backtype.storm.util$async_loop$fn__469.invoke(util.clj:406)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>
>         at java.lang.Thread.run(Thread.java:722) [na:1.7.0_21]
>
> Caused by: java.lang.NullPointerException: null
>
>         at
> backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:24)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.daemon.worker$mk_transfer_fn$fn__4335$fn__4339.invoke(worker.clj:108)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at backtype.storm.util$fast_list_map.invoke(util.clj:804)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.daemon.worker$mk_transfer_fn$fn__4335.invoke(worker.clj:108)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__4060.invoke(executor.clj:240)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.disruptor$clojure_handler$reify__836.onEvent(disruptor.clj:43)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
> ~[storm-core-0.9.0.1.jar:na]
>
>         ... 6 common frames omitted
>
>
>
> --
> dashengju
> +86 13810875910
> dashengju@gmail.com
>

Re: Topology is hang when bolt "Async loop died" because KryoTupleSerializer.serialize throws NullPointerException

Posted by 鞠大升 <da...@gmail.com>.
@Evans, we are using storm version is 0.9.0.1,
https://github.com/apache/incubator-storm/tree/0.9.0.1

according the exception, it is thrown by KryoTupleSerializer's serialize
method, the code is :

    public byte[] serialize(Tuple tuple) {

        try {



            _kryoOut.clear();

            _kryoOut.writeInt(tuple.getSourceTask(), true);

            _kryoOut.writeInt(_ids.getStreamId(tuple.getSourceComponent(),
tuple.getSourceStreamId()), true);

            tuple.getMessageId().serialize(_kryoOut);

            _kryo.serializeInto(tuple.getValues(), _kryoOut);

            return _kryoOut.toBytes();

        } catch (IOException e) {

            throw new RuntimeException(e);

        }

    }
It's like the tuple is null, so when called tuple.getSourceTask()
NullPointerException
is thrown.


thx


On Tue, Mar 11, 2014 at 1:59 AM, Bobby Evans <ev...@yahoo-inc.com> wrote:

> OutputCollector is not thread safe.  Most of it is but it does call down
> into some code, like the shuffle is not thread safe.  The simplest way to
> be sure that what you are doing is safe, is to synchronize access to the
> OutputCollector within your own code.  If you could send out the version of
> storm that you are using we can probably track down exactly what is
> happening here.
>
> —Bobby
>
> From: 鞠大升 <da...@gmail.com>>
> Reply-To: "user@storm.incubator.apache.org<mailto:
> user@storm.incubator.apache.org>" <user@storm.incubator.apache.org<mailto:
> user@storm.incubator.apache.org>>
> Date: Monday, March 10, 2014 at 2:52 AM
> To: "user@storm.incubator.apache.org<mailto:
> user@storm.incubator.apache.org>" <user@storm.incubator.apache.org<mailto:
> user@storm.incubator.apache.org>>, "dev@storm.incubator.apache.org<mailto:
> dev@storm.incubator.apache.org>" <dev@storm.incubator.apache.org<mailto:
> dev@storm.incubator.apache.org>>
> Subject: Topology is hang when bolt "Async loop died" because
> KryoTupleSerializer.serialize throws NullPointerException
>
> hi, all
>
> Background:
>
> ---------------------------------------------------------------------------------------------------------------------
> we are using Storm 0.9.0.1, our topology has KafkaSpout(read logs from
> kafka),  ParserBolt(paser log), SaverBolt(save to kafka again). KafkaSpout
> have 16 threads, ParserBolt have 32 threads,  SaverBolt have 16 threads.
> The ParserBolt is written in python using Multilang.
>
> Problems:
>
> ---------------------------------------------------------------------------------------------------------------------
> Sometimes, KryoTupleSerializer.serialize throws NullPointerException cause
> the ParserBolt died。Then the supervisor will restart the bolts again, but
> the new bolt will never receive any tuples, and the topology is hang until
> we restart the topology.
>
> Analyse:
>
> ---------------------------------------------------------------------------------------------------------------------
> We found a TroubleShooting(
> https://github.com/nathanmarz/storm/wiki/Troubleshooting#wiki-nullpointerexception-from-deep-inside-storm) says:  This is caused by having multiple threads issue methods on the
> OutputCollector. All emits, acks, and fails must happen on the same thread.
> One subtle way this can happen is if you make a IBasicBolt that emits on a
> separate thread. IBasicBolt's automatically ack after execute is called, so
> this would cause multiple threads to use the OutputCollector leading to
> this exception. When using a basic bolt, all emits must happen in the same
> thread that runs execute.
> And we found in ShellBolt.java,the _readerThread is a new thread,
> handleEmit will call emit to emit new tuples.
>
> But another wiki(
> https://github.com/nathanmarz/storm/wiki/Concepts#wiki-bolts) says:  Its
> perfectly fine to launch new threads in bolts that do processing
> asynchronously. OutputCollector<
> http://nathanmarz.github.com/storm/doc/backtype/storm/task/OutputCollector.html>
> is thread-safe and can be called at any time.
>
> So we have questions:
>
> ---------------------------------------------------------------------------------------------------------------------
> 1) does OutputCollector is thread-safe or not?  if it is not thread-safe,
> then all emits, acks, and fails must happen on the same thread, does
> ShellBolt has a bug?
> 2) when the bolt is restart, why the topology is hang? by the way, we are
> using netty.
>
> anyone can help?
>
> the work.log:
>
> ---------------------------------------------------------------------------------------------------------------------
>
> 2014-03-10 12:48:31 b.s.util [ERROR] Async loop died!
>
> java.lang.RuntimeException: java.lang.NullPointerException
>
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.disruptor$consume_loop_STAR_$fn__849.invoke(disruptor.clj:74)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at backtype.storm.util$async_loop$fn__469.invoke(util.clj:406)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>
>         at java.lang.Thread.run(Thread.java:722) [na:1.7.0_21]
>
> Caused by: java.lang.NullPointerException: null
>
>         at
> backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:24)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.daemon.worker$mk_transfer_fn$fn__4335$fn__4339.invoke(worker.clj:108)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at backtype.storm.util$fast_list_map.invoke(util.clj:804)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.daemon.worker$mk_transfer_fn$fn__4335.invoke(worker.clj:108)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__4060.invoke(executor.clj:240)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.disruptor$clojure_handler$reify__836.onEvent(disruptor.clj:43)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
> ~[storm-core-0.9.0.1.jar:na]
>
>         ... 6 common frames omitted
>
>
>
> --
> dashengju
> +86 13810875910
> dashengju@gmail.com<ma...@gmail.com>
>



-- 
dashengju
+86 13810875910
dashengju@gmail.com

Re: Topology is hang when bolt "Async loop died" because KryoTupleSerializer.serialize throws NullPointerException

Posted by 鞠大升 <da...@gmail.com>.
@Evans, we are using storm version is 0.9.0.1,
https://github.com/apache/incubator-storm/tree/0.9.0.1

according the exception, it is thrown by KryoTupleSerializer's serialize
method, the code is :

    public byte[] serialize(Tuple tuple) {

        try {



            _kryoOut.clear();

            _kryoOut.writeInt(tuple.getSourceTask(), true);

            _kryoOut.writeInt(_ids.getStreamId(tuple.getSourceComponent(),
tuple.getSourceStreamId()), true);

            tuple.getMessageId().serialize(_kryoOut);

            _kryo.serializeInto(tuple.getValues(), _kryoOut);

            return _kryoOut.toBytes();

        } catch (IOException e) {

            throw new RuntimeException(e);

        }

    }
It's like the tuple is null, so when called tuple.getSourceTask()
NullPointerException
is thrown.


thx


On Tue, Mar 11, 2014 at 1:59 AM, Bobby Evans <ev...@yahoo-inc.com> wrote:

> OutputCollector is not thread safe.  Most of it is but it does call down
> into some code, like the shuffle is not thread safe.  The simplest way to
> be sure that what you are doing is safe, is to synchronize access to the
> OutputCollector within your own code.  If you could send out the version of
> storm that you are using we can probably track down exactly what is
> happening here.
>
> —Bobby
>
> From: 鞠大升 <da...@gmail.com>>
> Reply-To: "user@storm.incubator.apache.org<mailto:
> user@storm.incubator.apache.org>" <user@storm.incubator.apache.org<mailto:
> user@storm.incubator.apache.org>>
> Date: Monday, March 10, 2014 at 2:52 AM
> To: "user@storm.incubator.apache.org<mailto:
> user@storm.incubator.apache.org>" <user@storm.incubator.apache.org<mailto:
> user@storm.incubator.apache.org>>, "dev@storm.incubator.apache.org<mailto:
> dev@storm.incubator.apache.org>" <dev@storm.incubator.apache.org<mailto:
> dev@storm.incubator.apache.org>>
> Subject: Topology is hang when bolt "Async loop died" because
> KryoTupleSerializer.serialize throws NullPointerException
>
> hi, all
>
> Background:
>
> ---------------------------------------------------------------------------------------------------------------------
> we are using Storm 0.9.0.1, our topology has KafkaSpout(read logs from
> kafka),  ParserBolt(paser log), SaverBolt(save to kafka again). KafkaSpout
> have 16 threads, ParserBolt have 32 threads,  SaverBolt have 16 threads.
> The ParserBolt is written in python using Multilang.
>
> Problems:
>
> ---------------------------------------------------------------------------------------------------------------------
> Sometimes, KryoTupleSerializer.serialize throws NullPointerException cause
> the ParserBolt died。Then the supervisor will restart the bolts again, but
> the new bolt will never receive any tuples, and the topology is hang until
> we restart the topology.
>
> Analyse:
>
> ---------------------------------------------------------------------------------------------------------------------
> We found a TroubleShooting(
> https://github.com/nathanmarz/storm/wiki/Troubleshooting#wiki-nullpointerexception-from-deep-inside-storm) says:  This is caused by having multiple threads issue methods on the
> OutputCollector. All emits, acks, and fails must happen on the same thread.
> One subtle way this can happen is if you make a IBasicBolt that emits on a
> separate thread. IBasicBolt's automatically ack after execute is called, so
> this would cause multiple threads to use the OutputCollector leading to
> this exception. When using a basic bolt, all emits must happen in the same
> thread that runs execute.
> And we found in ShellBolt.java,the _readerThread is a new thread,
> handleEmit will call emit to emit new tuples.
>
> But another wiki(
> https://github.com/nathanmarz/storm/wiki/Concepts#wiki-bolts) says:  Its
> perfectly fine to launch new threads in bolts that do processing
> asynchronously. OutputCollector<
> http://nathanmarz.github.com/storm/doc/backtype/storm/task/OutputCollector.html>
> is thread-safe and can be called at any time.
>
> So we have questions:
>
> ---------------------------------------------------------------------------------------------------------------------
> 1) does OutputCollector is thread-safe or not?  if it is not thread-safe,
> then all emits, acks, and fails must happen on the same thread, does
> ShellBolt has a bug?
> 2) when the bolt is restart, why the topology is hang? by the way, we are
> using netty.
>
> anyone can help?
>
> the work.log:
>
> ---------------------------------------------------------------------------------------------------------------------
>
> 2014-03-10 12:48:31 b.s.util [ERROR] Async loop died!
>
> java.lang.RuntimeException: java.lang.NullPointerException
>
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.disruptor$consume_loop_STAR_$fn__849.invoke(disruptor.clj:74)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at backtype.storm.util$async_loop$fn__469.invoke(util.clj:406)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
>
>         at java.lang.Thread.run(Thread.java:722) [na:1.7.0_21]
>
> Caused by: java.lang.NullPointerException: null
>
>         at
> backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:24)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.daemon.worker$mk_transfer_fn$fn__4335$fn__4339.invoke(worker.clj:108)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at backtype.storm.util$fast_list_map.invoke(util.clj:804)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.daemon.worker$mk_transfer_fn$fn__4335.invoke(worker.clj:108)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__4060.invoke(executor.clj:240)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.disruptor$clojure_handler$reify__836.onEvent(disruptor.clj:43)
> ~[storm-core-0.9.0.1.jar:na]
>
>         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
> ~[storm-core-0.9.0.1.jar:na]
>
>         ... 6 common frames omitted
>
>
>
> --
> dashengju
> +86 13810875910
> dashengju@gmail.com<ma...@gmail.com>
>



-- 
dashengju
+86 13810875910
dashengju@gmail.com

Re: Topology is hang when bolt "Async loop died" because KryoTupleSerializer.serialize throws NullPointerException

Posted by Bobby Evans <ev...@yahoo-inc.com>.
OutputCollector is not thread safe.  Most of it is but it does call down into some code, like the shuffle is not thread safe.  The simplest way to be sure that what you are doing is safe, is to synchronize access to the OutputCollector within your own code.  If you could send out the version of storm that you are using we can probably track down exactly what is happening here.

—Bobby

From: 鞠大升 <da...@gmail.com>>
Reply-To: "user@storm.incubator.apache.org<ma...@storm.incubator.apache.org>" <us...@storm.incubator.apache.org>>
Date: Monday, March 10, 2014 at 2:52 AM
To: "user@storm.incubator.apache.org<ma...@storm.incubator.apache.org>" <us...@storm.incubator.apache.org>>, "dev@storm.incubator.apache.org<ma...@storm.incubator.apache.org>" <de...@storm.incubator.apache.org>>
Subject: Topology is hang when bolt "Async loop died" because KryoTupleSerializer.serialize throws NullPointerException

hi, all

Background:
---------------------------------------------------------------------------------------------------------------------
we are using Storm 0.9.0.1, our topology has KafkaSpout(read logs from kafka),  ParserBolt(paser log), SaverBolt(save to kafka again). KafkaSpout have 16 threads, ParserBolt have 32 threads,  SaverBolt have 16 threads. The ParserBolt is written in python using Multilang.

Problems:
---------------------------------------------------------------------------------------------------------------------
Sometimes, KryoTupleSerializer.serialize throws NullPointerException cause the ParserBolt died。Then the supervisor will restart the bolts again, but the new bolt will never receive any tuples, and the topology is hang until we restart the topology.

Analyse:
---------------------------------------------------------------------------------------------------------------------
We found a TroubleShooting(https://github.com/nathanmarz/storm/wiki/Troubleshooting#wiki-nullpointerexception-from-deep-inside-storm ) says:  This is caused by having multiple threads issue methods on the OutputCollector. All emits, acks, and fails must happen on the same thread. One subtle way this can happen is if you make a IBasicBolt that emits on a separate thread. IBasicBolt's automatically ack after execute is called, so this would cause multiple threads to use the OutputCollector leading to this exception. When using a basic bolt, all emits must happen in the same thread that runs execute.
And we found in ShellBolt.java,the _readerThread is a new thread, handleEmit will call emit to emit new tuples.

But another wiki(https://github.com/nathanmarz/storm/wiki/Concepts#wiki-bolts) says:  Its perfectly fine to launch new threads in bolts that do processing asynchronously. OutputCollector<http://nathanmarz.github.com/storm/doc/backtype/storm/task/OutputCollector.html> is thread-safe and can be called at any time.

So we have questions:
---------------------------------------------------------------------------------------------------------------------
1) does OutputCollector is thread-safe or not?  if it is not thread-safe, then all emits, acks, and fails must happen on the same thread, does ShellBolt has a bug?
2) when the bolt is restart, why the topology is hang? by the way, we are using netty.

anyone can help?

the work.log:
---------------------------------------------------------------------------------------------------------------------

2014-03-10 12:48:31 b.s.util [ERROR] Async loop died!

java.lang.RuntimeException: java.lang.NullPointerException

        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90) ~[storm-core-0.9.0.1.jar:na]

        at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61) ~[storm-core-0.9.0.1.jar:na]

        at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) ~[storm-core-0.9.0.1.jar:na]

        at backtype.storm.disruptor$consume_loop_STAR_$fn__849.invoke(disruptor.clj:74) ~[storm-core-0.9.0.1.jar:na]

        at backtype.storm.util$async_loop$fn__469.invoke(util.clj:406) ~[storm-core-0.9.0.1.jar:na]

        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]

        at java.lang.Thread.run(Thread.java:722) [na:1.7.0_21]

Caused by: java.lang.NullPointerException: null

        at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:24) ~[storm-core-0.9.0.1.jar:na]

        at backtype.storm.daemon.worker$mk_transfer_fn$fn__4335$fn__4339.invoke(worker.clj:108) ~[storm-core-0.9.0.1.jar:na]

        at backtype.storm.util$fast_list_map.invoke(util.clj:804) ~[storm-core-0.9.0.1.jar:na]

        at backtype.storm.daemon.worker$mk_transfer_fn$fn__4335.invoke(worker.clj:108) ~[storm-core-0.9.0.1.jar:na]

        at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__4060.invoke(executor.clj:240) ~[storm-core-0.9.0.1.jar:na]

        at backtype.storm.disruptor$clojure_handler$reify__836.onEvent(disruptor.clj:43) ~[storm-core-0.9.0.1.jar:na]

        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) ~[storm-core-0.9.0.1.jar:na]

        ... 6 common frames omitted



--
dashengju
+86 13810875910
dashengju@gmail.com<ma...@gmail.com>