You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Srikanth <sr...@gmail.com> on 2015/04/21 22:24:41 UTC

Storm Multilang -- Which queue am I blowing up?

Hello,

I'm writing a storm topology with a multilang bolt and see that worker is
crashing consistently. My data flow is like this,

                         KafkaSpout -> KafkaDecoderBolt ->
> RankingBolt(Multilang) -> ...


Problem seems to be with the KafkaDecoderBolt -> RankingBolt pipeline.
Only way I'm able to avoid the crash is by placing a small sleep of 30ms at
the end of KafkaDecoderBolt. This suggests that KafkaDecoderBolt is writing
way faster that multilang bolt is able to process.
This is indeed true. Multilang bolt takes ~100ms to process and
KafkaDecoderBolt takes just 2ms. I'm trying to understand which queue I'm
blowing up.

I tried keeping max outstanding tuple to low value hoping that spout would
throttle if multilang bolt is not able to ack fast enough. That didn't seem
to help.

>         config.setMaxSpoutPending(50);


I also tried increasing executor buffer but that didn't seem to help even a
small bit. Didn't change network receiver/sender buffer as my tests were on
a single worker.

>         config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384*2);
>         config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384*2);
>         config.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx4g -Xms1024m");


I know I need higher executors for multilang bolt but looks like I need one
more config to tweak. Not sure which one.

Below is the exception and logging at DEBUG level.

2015-04-21T15:35:24.954-0400 b.s.d.task [INFO] Emitting: RankingBolt
> __ack_ack [-5027869909211355112 3891537668110238939]
> 2015-04-21T15:35:24.954-0400 b.s.d.executor [INFO] Processing received
> message source: RankingBolt:4, stream: __ack_ack, id: {},
> [-5027869909211355112 3891537668110238939]
> 2015-04-21T15:35:24.954-0400 b.s.t.ShellBolt [INFO] ShellLog pid:26767,
> name:RankingBolt Processed = 1499
> 2015-04-21T15:35:24.955-0400 b.s.t.ShellBolt [ERROR] Halting process:
> ShellBolt died.
> java.lang.RuntimeException: backtype.storm.multilang.NoOutputException:
> Pipe to subprocess seems to be broken! No output read.
> Serializer Exception:
> Execution halted
>         at
> backtype.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:101)
> ~[storm-core-0.9.4.jar:0.9.4]
>         at
> backtype.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:318)
> ~[storm-core-0.9.4.jar:0.9.4]
>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]



Srikanth

Re: Storm Multilang -- Which queue am I blowing up?

Posted by Srikanth <sr...@gmail.com>.
While trying to debug this I noticed that ShellProcess still uses log4j
directly instead of slf4j. Not sure if this was intentional.

<http://grepcode.com/file/repo1.maven.org/maven2/org.apache.storm/storm-core/0.9.3/backtype/storm/utils/ShellProcess.java#>

 import org.apache.log4j.Logger
<http://grepcode.com/file/repo1.maven.org/maven2/org.slf4j/log4j-over-slf4j/1.6.6/org/apache/log4j/Logger.java#Logger>;

<http://grepcode.com/file/repo1.maven.org/maven2/org.apache.storm/storm-core/0.9.3/backtype/storm/utils/ShellProcess.java#>

 public class ShellProcess implements Serializable
<http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b27/java/io/Serializable.java#Serializable>
{


I finally figured out the issue here. I was reusing the object that is part
of an emitted tuple in bolt. Looks like emit() was batching these tuples
and was holding reference to my object.
Subsequent call to execute() was corrupting the object.

The object I'm passing is fairly light weight. So I can get away with
recreating new one for every tuple.
If I have to reuse, how can I achieve this? If I create a pool, how can I
get pool size to work with emit() block size?

Srikanth


On Tue, Apr 21, 2015 at 4:24 PM, Srikanth <sr...@gmail.com> wrote:

> Hello,
>
> I'm writing a storm topology with a multilang bolt and see that worker is
> crashing consistently. My data flow is like this,
>
>                          KafkaSpout -> KafkaDecoderBolt ->
>> RankingBolt(Multilang) -> ...
>
>
> Problem seems to be with the KafkaDecoderBolt -> RankingBolt pipeline.
> Only way I'm able to avoid the crash is by placing a small sleep of 30ms
> at the end of KafkaDecoderBolt. This suggests that KafkaDecoderBolt is
> writing way faster that multilang bolt is able to process.
> This is indeed true. Multilang bolt takes ~100ms to process and
> KafkaDecoderBolt takes just 2ms. I'm trying to understand which queue I'm
> blowing up.
>
> I tried keeping max outstanding tuple to low value hoping that spout would
> throttle if multilang bolt is not able to ack fast enough. That didn't seem
> to help.
>
>>         config.setMaxSpoutPending(50);
>
>
> I also tried increasing executor buffer but that didn't seem to help even
> a small bit. Didn't change network receiver/sender buffer as my tests were
> on a single worker.
>
>>         config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384*2);
>>         config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384*2);
>>         config.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx4g -Xms1024m");
>
>
> I know I need higher executors for multilang bolt but looks like I need
> one more config to tweak. Not sure which one.
>
> Below is the exception and logging at DEBUG level.
>
> 2015-04-21T15:35:24.954-0400 b.s.d.task [INFO] Emitting: RankingBolt
>> __ack_ack [-5027869909211355112 3891537668110238939]
>> 2015-04-21T15:35:24.954-0400 b.s.d.executor [INFO] Processing received
>> message source: RankingBolt:4, stream: __ack_ack, id: {},
>> [-5027869909211355112 3891537668110238939]
>> 2015-04-21T15:35:24.954-0400 b.s.t.ShellBolt [INFO] ShellLog pid:26767,
>> name:RankingBolt Processed = 1499
>> 2015-04-21T15:35:24.955-0400 b.s.t.ShellBolt [ERROR] Halting process:
>> ShellBolt died.
>> java.lang.RuntimeException: backtype.storm.multilang.NoOutputException:
>> Pipe to subprocess seems to be broken! No output read.
>> Serializer Exception:
>> Execution halted
>>         at
>> backtype.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:101)
>> ~[storm-core-0.9.4.jar:0.9.4]
>>         at
>> backtype.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:318)
>> ~[storm-core-0.9.4.jar:0.9.4]
>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_65]
>
>
>
> Srikanth
>