You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by ram kumar <ra...@gmail.com> on 2016/06/23 07:05:01 UTC

Heartbeat timeout with Python spout

Hi,


*Version:*
    Storm : 0.10.0
    Streamparse : 2.1.4


I am running a storm topology with a python streamparse "sparse run".

This topology stops executing in the middle and throw an exception


158031 [pool-37-thread-1] ERROR b.s.s.ShellSpout - Halting process:
> ShellSpout died.
> java.lang.RuntimeException: *subprocess heartbeat timeout*
>     at
> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
> [storm-core-0.10.0.jar:0.10.0]
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_40]
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> [?:1.8.0_40]
>     at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> [?:1.8.0_40]
>     at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> [?:1.8.0_40]
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [?:1.8.0_40]
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [?:1.8.0_40]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
> 158036 [pool-37-thread-1] ERROR b.s.d.executor -
> java.lang.RuntimeException: subprocess heartbeat timeout
>     at
> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
> [storm-core-0.10.0.jar:0.10.0]
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_40]
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> [?:1.8.0_40]
>     at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> [?:1.8.0_40]
>     at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> [?:1.8.0_40]
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [?:1.8.0_40]
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [?:1.8.0_40]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>


This occurs randomly
I can't able to trace back to the problem

Maybe if spout takes too long to process, then streamparse can't
acknowledge the heartbeat in time

changed "supervisor.worker.timeout.secs" from 30 to 600

Still the topology breaks. Is there any other options here?


Thanks,
Ram.

Re: Heartbeat timeout with Python spout

Posted by Jungtaek Lim <ka...@gmail.com>.
FYI - submit a pull request regarding STORM-1928:
https://github.com/apache/storm/pull/1526

It's against master branch, but since ShellSpout is not heavily modified
across minor versions so you can easily apply patch to Storm 0.10.x and
build your own storm-core package if you want to evaluate.

Hope this helps,

Thanks,
Jungtaek Lim (HeartSaVioR)


2016년 6월 24일 (금) 오전 12:04, Jungtaek Lim <ka...@gmail.com>님이 작성:

> FYI - Filed issue from myself here:
> https://issues.apache.org/jira/browse/STORM-1928
>
> 2016년 6월 23일 (목) 오후 11:33, Jungtaek Lim <ka...@gmail.com>님이 작성:
>
>> Do get_simple_consumer() blocks for receiving messages? If it is, can we
>> set timeout on this?
>>
>> Btw, I found edge-case from ShellSpout (not sure if it represents your
>> case) :
>> When nextTuple is not calling at any chances (max spout pending,
>> backpressure introduced at 1.0.0, and so on) multi-lang spout don't receive
>> any message from ShellSpout thus no messages from multi-lang spout side, so
>> no heartbeat is being marked.
>>
>> I'll file an issue and see if it's easy to fix.
>>
>> Jungtaek Lim (HeartSaVioR)
>>
>> 2016년 6월 23일 (목) 오후 10:59, cogumelosmaravilha <co...@sapo.pt>님이
>> 작성:
>>
>>> Python Petrel Spout sample;
>>>
>>> from pykafka import KafkaClient
>>> from petrel import storm
>>> from petrel.emitter import Spout
>>>
>>> class MPDataSpoutInd(Spout):
>>>     """Topology Spout."""
>>>     def __init__(self):
>>>         self.kafclient = KafkaClient(hosts="x.x.x.x:9092")
>>>         super(MPDataSpoutInd, self).__init__(script=__file__)
>>>
>>>     @classmethod
>>>     def declareOutputFields(cls):
>>>         return ['x_id', 'y_id', 'z_id', 'date_stamp',
>>>                 'time_stamp', 'a', 'b', 'c', 'date_start',
>>>                 'time_start', 'users']
>>>
>>>     def initialize(self, conf, context):
>>>         topic = self.kafclient.topics['individual_data']
>>>         consumer = topic.get_simple_consumer()
>>>         consumer.reset_offsets()
>>>
>>>     def nextTuple(self):
>>>         topic = self.kafclient.topics['individual_data']
>>>         consumer = topic.get_simple_consumer()
>>>         for message in consumer:
>>>             if message is not None:
>>>                 x_id = message.value[0:24]
>>>                 y_id = message.value[25:49]
>>>                 z_id = message.value[50:74]
>>>                 date_stamp = message.value[75:85]
>>>                 time_stamp = message.value[86:98]
>>>                 a = message.value[99:102]
>>>                 b = message.value[103:106]
>>>                 c = message.value[107:110]
>>>                 start_date = message.value[111:121]
>>>                 start_time = message.value[122:134]
>>>                 users = message.value[135:138]
>>>                 datetime_str = date_stamp + ' ' + time_stamp
>>>                 datetime_start = start_date + ' ' + start_time
>>>
>>>                 if datetime_str > datetime_start:
>>>                     storm.emit([x_id, y_id, z_id, date_stamp,
>>>                                 time_stamp, str(a), str(b), str(c),
>>> start_date,
>>>                                 start_time, str(users))
>>>
>>> def run():
>>>     """Petrel must be."""
>>>     MPDataSpoutInd().run()
>>>
>>>
>>> On 23-06-2016 13:34, Jungtaek Lim wrote:
>>>
>>> Could you share implementation of spout?
>>> In multi-lang user level functions shouldn't block, so heartbeat timeout
>>> will occur if your spout requests to Kafka and wait for response infinitely.
>>>
>>> - Jungtaek Lim (HeartSaVioR)
>>>
>>> 2016년 6월 23일 (목) 오후 7:25, cogumelosmaravilha <co...@sapo.pt>
>>> 님이 작성:
>>>
>>>> Yes, but if you submit to Storm, it's ok, with or without an empty
>>>> Kafka.
>>>>
>>>>
>>>> On 23-06-2016 10:40, ram kumar wrote:
>>>>
>>>> Yes, The same topology, when I run in single node cluster, it works.
>>>> And when there is no data to consume from kafka, it shows heartbeat timeout
>>>> error.
>>>>
>>>> Here, I am testing in multi node cluster and kafka server is in
>>>> different node
>>>>
>>>> On Thu, Jun 23, 2016 at 1:48 PM, cogumelosmaravilha <
>>>> cogumelosmaravilha@sapo.pt> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> It happens to me to, but only when my kafka is empy.
>>>>> I'm using Petrel because the generated jar file is really small like
>>>>> 300k Petrel vs 16M Streamparse.
>>>>>
>>>>>
>>>>> On 23-06-2016 08:05, ram kumar wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>> *Version: *
>>>>>     Storm : 0.10.0
>>>>>     Streamparse : 2.1.4
>>>>>
>>>>>
>>>>> I am running a storm topology with a python streamparse "sparse run".
>>>>>
>>>>> This topology stops executing in the middle and throw an exception
>>>>>
>>>>>
>>>>> 158031 [pool-37-thread-1] ERROR b.s.s.ShellSpout - Halting process:
>>>>>> ShellSpout died.
>>>>>> java.lang.RuntimeException: *subprocess heartbeat timeout*
>>>>>>     at
>>>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>>>>>> [storm-core-0.10.0.jar:0.10.0]
>>>>>>     at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>> [?:1.8.0_40]
>>>>>>     at
>>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>>>> [?:1.8.0_40]
>>>>>>     at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>>> [?:1.8.0_40]
>>>>>>     at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>>> [?:1.8.0_40]
>>>>>>     at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>> [?:1.8.0_40]
>>>>>>     at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>> [?:1.8.0_40]
>>>>>>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>>>>> 158036 [pool-37-thread-1] ERROR b.s.d.executor -
>>>>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>>>>     at
>>>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>>>>>> [storm-core-0.10.0.jar:0.10.0]
>>>>>>     at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>> [?:1.8.0_40]
>>>>>>     at
>>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>>>> [?:1.8.0_40]
>>>>>>     at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>>> [?:1.8.0_40]
>>>>>>     at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>>> [?:1.8.0_40]
>>>>>>     at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>> [?:1.8.0_40]
>>>>>>     at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>> [?:1.8.0_40]
>>>>>>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>>>>>
>>>>>
>>>>>
>>>>> This occurs randomly
>>>>> I can't able to trace back to the problem
>>>>>
>>>>> Maybe if spout takes too long to process, then streamparse can't
>>>>> acknowledge the heartbeat in time
>>>>>
>>>>> changed "supervisor.worker.timeout.secs" from 30 to 600
>>>>>
>>>>> Still the topology breaks. Is there any other options here?
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Ram.
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>

Re: Heartbeat timeout with Python spout

Posted by Jungtaek Lim <ka...@gmail.com>.
FYI - Filed issue from myself here:
https://issues.apache.org/jira/browse/STORM-1928

2016년 6월 23일 (목) 오후 11:33, Jungtaek Lim <ka...@gmail.com>님이 작성:

> Do get_simple_consumer() blocks for receiving messages? If it is, can we
> set timeout on this?
>
> Btw, I found edge-case from ShellSpout (not sure if it represents your
> case) :
> When nextTuple is not calling at any chances (max spout pending,
> backpressure introduced at 1.0.0, and so on) multi-lang spout don't receive
> any message from ShellSpout thus no messages from multi-lang spout side, so
> no heartbeat is being marked.
>
> I'll file an issue and see if it's easy to fix.
>
> Jungtaek Lim (HeartSaVioR)
>
> 2016년 6월 23일 (목) 오후 10:59, cogumelosmaravilha <co...@sapo.pt>님이
> 작성:
>
>> Python Petrel Spout sample;
>>
>> from pykafka import KafkaClient
>> from petrel import storm
>> from petrel.emitter import Spout
>>
>> class MPDataSpoutInd(Spout):
>>     """Topology Spout."""
>>     def __init__(self):
>>         self.kafclient = KafkaClient(hosts="x.x.x.x:9092")
>>         super(MPDataSpoutInd, self).__init__(script=__file__)
>>
>>     @classmethod
>>     def declareOutputFields(cls):
>>         return ['x_id', 'y_id', 'z_id', 'date_stamp',
>>                 'time_stamp', 'a', 'b', 'c', 'date_start',
>>                 'time_start', 'users']
>>
>>     def initialize(self, conf, context):
>>         topic = self.kafclient.topics['individual_data']
>>         consumer = topic.get_simple_consumer()
>>         consumer.reset_offsets()
>>
>>     def nextTuple(self):
>>         topic = self.kafclient.topics['individual_data']
>>         consumer = topic.get_simple_consumer()
>>         for message in consumer:
>>             if message is not None:
>>                 x_id = message.value[0:24]
>>                 y_id = message.value[25:49]
>>                 z_id = message.value[50:74]
>>                 date_stamp = message.value[75:85]
>>                 time_stamp = message.value[86:98]
>>                 a = message.value[99:102]
>>                 b = message.value[103:106]
>>                 c = message.value[107:110]
>>                 start_date = message.value[111:121]
>>                 start_time = message.value[122:134]
>>                 users = message.value[135:138]
>>                 datetime_str = date_stamp + ' ' + time_stamp
>>                 datetime_start = start_date + ' ' + start_time
>>
>>                 if datetime_str > datetime_start:
>>                     storm.emit([x_id, y_id, z_id, date_stamp,
>>                                 time_stamp, str(a), str(b), str(c),
>> start_date,
>>                                 start_time, str(users))
>>
>> def run():
>>     """Petrel must be."""
>>     MPDataSpoutInd().run()
>>
>>
>> On 23-06-2016 13:34, Jungtaek Lim wrote:
>>
>> Could you share implementation of spout?
>> In multi-lang user level functions shouldn't block, so heartbeat timeout
>> will occur if your spout requests to Kafka and wait for response infinitely.
>>
>> - Jungtaek Lim (HeartSaVioR)
>>
>> 2016년 6월 23일 (목) 오후 7:25, cogumelosmaravilha <co...@sapo.pt>
>> 님이 작성:
>>
>>> Yes, but if you submit to Storm, it's ok, with or without an empty Kafka.
>>>
>>>
>>> On 23-06-2016 10:40, ram kumar wrote:
>>>
>>> Yes, The same topology, when I run in single node cluster, it works. And
>>> when there is no data to consume from kafka, it shows heartbeat timeout
>>> error.
>>>
>>> Here, I am testing in multi node cluster and kafka server is in
>>> different node
>>>
>>> On Thu, Jun 23, 2016 at 1:48 PM, cogumelosmaravilha <
>>> cogumelosmaravilha@sapo.pt> wrote:
>>>
>>>> Hi,
>>>>
>>>> It happens to me to, but only when my kafka is empy.
>>>> I'm using Petrel because the generated jar file is really small like
>>>> 300k Petrel vs 16M Streamparse.
>>>>
>>>>
>>>> On 23-06-2016 08:05, ram kumar wrote:
>>>>
>>>> Hi,
>>>>
>>>>
>>>> *Version: *
>>>>     Storm : 0.10.0
>>>>     Streamparse : 2.1.4
>>>>
>>>>
>>>> I am running a storm topology with a python streamparse "sparse run".
>>>>
>>>> This topology stops executing in the middle and throw an exception
>>>>
>>>>
>>>> 158031 [pool-37-thread-1] ERROR b.s.s.ShellSpout - Halting process:
>>>>> ShellSpout died.
>>>>> java.lang.RuntimeException: *subprocess heartbeat timeout*
>>>>>     at
>>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>>>>> [storm-core-0.10.0.jar:0.10.0]
>>>>>     at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>> [?:1.8.0_40]
>>>>>     at
>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>>> [?:1.8.0_40]
>>>>>     at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>> [?:1.8.0_40]
>>>>>     at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>> [?:1.8.0_40]
>>>>>     at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>> [?:1.8.0_40]
>>>>>     at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>> [?:1.8.0_40]
>>>>>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>>>> 158036 [pool-37-thread-1] ERROR b.s.d.executor -
>>>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>>>     at
>>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>>>>> [storm-core-0.10.0.jar:0.10.0]
>>>>>     at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>> [?:1.8.0_40]
>>>>>     at
>>>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>>> [?:1.8.0_40]
>>>>>     at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>> [?:1.8.0_40]
>>>>>     at
>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>> [?:1.8.0_40]
>>>>>     at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>> [?:1.8.0_40]
>>>>>     at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>> [?:1.8.0_40]
>>>>>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>>>>
>>>>
>>>>
>>>> This occurs randomly
>>>> I can't able to trace back to the problem
>>>>
>>>> Maybe if spout takes too long to process, then streamparse can't
>>>> acknowledge the heartbeat in time
>>>>
>>>> changed "supervisor.worker.timeout.secs" from 30 to 600
>>>>
>>>> Still the topology breaks. Is there any other options here?
>>>>
>>>>
>>>> Thanks,
>>>> Ram.
>>>>
>>>>
>>>>
>>>
>>>
>>

Re: Heartbeat timeout with Python spout

Posted by Jungtaek Lim <ka...@gmail.com>.
Do get_simple_consumer() blocks for receiving messages? If it is, can we
set timeout on this?

Btw, I found edge-case from ShellSpout (not sure if it represents your
case) :
When nextTuple is not calling at any chances (max spout pending,
backpressure introduced at 1.0.0, and so on) multi-lang spout don't receive
any message from ShellSpout thus no messages from multi-lang spout side, so
no heartbeat is being marked.

I'll file an issue and see if it's easy to fix.

Jungtaek Lim (HeartSaVioR)

2016년 6월 23일 (목) 오후 10:59, cogumelosmaravilha <co...@sapo.pt>님이
작성:

> Python Petrel Spout sample;
>
> from pykafka import KafkaClient
> from petrel import storm
> from petrel.emitter import Spout
>
> class MPDataSpoutInd(Spout):
>     """Topology Spout."""
>     def __init__(self):
>         self.kafclient = KafkaClient(hosts="x.x.x.x:9092")
>         super(MPDataSpoutInd, self).__init__(script=__file__)
>
>     @classmethod
>     def declareOutputFields(cls):
>         return ['x_id', 'y_id', 'z_id', 'date_stamp',
>                 'time_stamp', 'a', 'b', 'c', 'date_start',
>                 'time_start', 'users']
>
>     def initialize(self, conf, context):
>         topic = self.kafclient.topics['individual_data']
>         consumer = topic.get_simple_consumer()
>         consumer.reset_offsets()
>
>     def nextTuple(self):
>         topic = self.kafclient.topics['individual_data']
>         consumer = topic.get_simple_consumer()
>         for message in consumer:
>             if message is not None:
>                 x_id = message.value[0:24]
>                 y_id = message.value[25:49]
>                 z_id = message.value[50:74]
>                 date_stamp = message.value[75:85]
>                 time_stamp = message.value[86:98]
>                 a = message.value[99:102]
>                 b = message.value[103:106]
>                 c = message.value[107:110]
>                 start_date = message.value[111:121]
>                 start_time = message.value[122:134]
>                 users = message.value[135:138]
>                 datetime_str = date_stamp + ' ' + time_stamp
>                 datetime_start = start_date + ' ' + start_time
>
>                 if datetime_str > datetime_start:
>                     storm.emit([x_id, y_id, z_id, date_stamp,
>                                 time_stamp, str(a), str(b), str(c),
> start_date,
>                                 start_time, str(users))
>
> def run():
>     """Petrel must be."""
>     MPDataSpoutInd().run()
>
>
> On 23-06-2016 13:34, Jungtaek Lim wrote:
>
> Could you share implementation of spout?
> In multi-lang user level functions shouldn't block, so heartbeat timeout
> will occur if your spout requests to Kafka and wait for response infinitely.
>
> - Jungtaek Lim (HeartSaVioR)
>
> 2016년 6월 23일 (목) 오후 7:25, cogumelosmaravilha <co...@sapo.pt>
> 님이 작성:
>
>> Yes, but if you submit to Storm, it's ok, with or without an empty Kafka.
>>
>>
>> On 23-06-2016 10:40, ram kumar wrote:
>>
>> Yes, The same topology, when I run in single node cluster, it works. And
>> when there is no data to consume from kafka, it shows heartbeat timeout
>> error.
>>
>> Here, I am testing in multi node cluster and kafka server is in different
>> node
>>
>> On Thu, Jun 23, 2016 at 1:48 PM, cogumelosmaravilha <
>> cogumelosmaravilha@sapo.pt> wrote:
>>
>>> Hi,
>>>
>>> It happens to me to, but only when my kafka is empy.
>>> I'm using Petrel because the generated jar file is really small like
>>> 300k Petrel vs 16M Streamparse.
>>>
>>>
>>> On 23-06-2016 08:05, ram kumar wrote:
>>>
>>> Hi,
>>>
>>>
>>> *Version: *
>>>     Storm : 0.10.0
>>>     Streamparse : 2.1.4
>>>
>>>
>>> I am running a storm topology with a python streamparse "sparse run".
>>>
>>> This topology stops executing in the middle and throw an exception
>>>
>>>
>>> 158031 [pool-37-thread-1] ERROR b.s.s.ShellSpout - Halting process:
>>>> ShellSpout died.
>>>> java.lang.RuntimeException: *subprocess heartbeat timeout*
>>>>     at
>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>>>> [storm-core-0.10.0.jar:0.10.0]
>>>>     at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>> [?:1.8.0_40]
>>>>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> [?:1.8.0_40]
>>>>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>>> 158036 [pool-37-thread-1] ERROR b.s.d.executor -
>>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>>     at
>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>>>> [storm-core-0.10.0.jar:0.10.0]
>>>>     at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>> [?:1.8.0_40]
>>>>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> [?:1.8.0_40]
>>>>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>>>
>>>
>>>
>>> This occurs randomly
>>> I can't able to trace back to the problem
>>>
>>> Maybe if spout takes too long to process, then streamparse can't
>>> acknowledge the heartbeat in time
>>>
>>> changed "supervisor.worker.timeout.secs" from 30 to 600
>>>
>>> Still the topology breaks. Is there any other options here?
>>>
>>>
>>> Thanks,
>>> Ram.
>>>
>>>
>>>
>>
>>
>

Re: Heartbeat timeout with Python spout

Posted by cogumelosmaravilha <co...@sapo.pt>.
Python Petrel Spout sample;

from pykafka import KafkaClient
from petrel import storm
from petrel.emitter import Spout

class MPDataSpoutInd(Spout):
     """Topology Spout."""
     def __init__(self):
         self.kafclient = KafkaClient(hosts="x.x.x.x:9092")
         super(MPDataSpoutInd, self).__init__(script=__file__)

     @classmethod
     def declareOutputFields(cls):
         return ['x_id', 'y_id', 'z_id', 'date_stamp',
                 'time_stamp', 'a', 'b', 'c', 'date_start',
                 'time_start', 'users']

     def initialize(self, conf, context):
         topic = self.kafclient.topics['individual_data']
         consumer = topic.get_simple_consumer()
         consumer.reset_offsets()

     def nextTuple(self):
         topic = self.kafclient.topics['individual_data']
         consumer = topic.get_simple_consumer()
         for message in consumer:
             if message is not None:
                 x_id = message.value[0:24]
                 y_id = message.value[25:49]
                 z_id = message.value[50:74]
                 date_stamp = message.value[75:85]
                 time_stamp = message.value[86:98]
                 a = message.value[99:102]
                 b = message.value[103:106]
                 c = message.value[107:110]
                 start_date = message.value[111:121]
                 start_time = message.value[122:134]
                 users = message.value[135:138]
                 datetime_str = date_stamp + ' ' + time_stamp
                 datetime_start = start_date + ' ' + start_time

                 if datetime_str > datetime_start:
                     storm.emit([x_id, y_id, z_id, date_stamp,
                                 time_stamp, str(a), str(b), str(c), 
start_date,
                                 start_time, str(users))

def run():
     """Petrel must be."""
     MPDataSpoutInd().run()

On 23-06-2016 13:34, Jungtaek Lim wrote:
> Could you share implementation of spout?
> In multi-lang user level functions shouldn't block, so heartbeat 
> timeout will occur if your spout requests to Kafka and wait for 
> response infinitely.
>
> - Jungtaek Lim (HeartSaVioR)
>
> 2016\ub144 6\uc6d4 23\uc77c (\ubaa9) \uc624\ud6c4 7:25, cogumelosmaravilha 
> <cogumelosmaravilha@sapo.pt <ma...@sapo.pt>> \ub2d8\uc774 
> \uc791\uc131:
>
>     Yes, but if you submit to Storm, it's ok, with or without an empty
>     Kafka.
>
>
>     On 23-06-2016 10:40, ram kumar wrote:
>>     Yes, The same topology, when I run in single node cluster, it
>>     works. And when there is no data to consume from kafka, it shows
>>     heartbeat timeout error.
>>
>>     Here, I am testing in multi node cluster and kafka server is in
>>     different node
>>
>>     On Thu, Jun 23, 2016 at 1:48 PM, cogumelosmaravilha
>>     <cogumelosmaravilha@sapo.pt <ma...@sapo.pt>>
>>     wrote:
>>
>>         Hi,
>>
>>         It happens to me to, but only when my kafka is empy.
>>         I'm using Petrel because the generated jar file is really
>>         small like 300k Petrel vs 16M Streamparse.
>>
>>
>>         On 23-06-2016 08:05, ram kumar wrote:
>>>         Hi,
>>>         *
>>>         *
>>>         *Version:
>>>         *
>>>             Storm : 0.10.0
>>>             Streamparse : 2.1.4
>>>
>>>
>>>         I am running a storm topology with a python streamparse
>>>         "sparse run".
>>>         This topology stops executing in the middle and throw an
>>>         exception
>>>
>>>             158031 [pool-37-thread-1] ERROR b.s.s.ShellSpout -
>>>             Halting process: ShellSpout died.
>>>             java.lang.RuntimeException: *subprocess heartbeat timeout*
>>>                 at
>>>             backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>>>             [storm-core-0.10.0.jar:0.10.0]
>>>                 at
>>>             java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>             [?:1.8.0_40]
>>>                 at
>>>             java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>             [?:1.8.0_40]
>>>                 at
>>>             java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>             [?:1.8.0_40]
>>>                 at
>>>             java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>             [?:1.8.0_40]
>>>                 at
>>>             java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>             [?:1.8.0_40]
>>>                 at
>>>             java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>             [?:1.8.0_40]
>>>                 at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>>             158036 [pool-37-thread-1] ERROR b.s.d.executor -
>>>             java.lang.RuntimeException: subprocess heartbeat timeout
>>>                 at
>>>             backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>>>             [storm-core-0.10.0.jar:0.10.0]
>>>                 at
>>>             java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>             [?:1.8.0_40]
>>>                 at
>>>             java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>             [?:1.8.0_40]
>>>                 at
>>>             java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>             [?:1.8.0_40]
>>>                 at
>>>             java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>             [?:1.8.0_40]
>>>                 at
>>>             java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>             [?:1.8.0_40]
>>>                 at
>>>             java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>             [?:1.8.0_40]
>>>                 at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>>
>>>
>>>
>>>         This occurs randomly
>>>         I can't able to trace back to the problem
>>>
>>>         Maybe if spout takes too long to process, then streamparse
>>>         can't acknowledge the heartbeat in time
>>>
>>>         changed "supervisor.worker.timeout.secs" from 30 to 600
>>>
>>>         Still the topology breaks. Is there any other options here?
>>>
>>>
>>>         Thanks,
>>>         Ram.
>>
>>
>


Re: Heartbeat timeout with Python spout

Posted by ram kumar <ra...@gmail.com>.
I changed python kafka version from 1.0.1 to 0.9.5
works fine.

Thanks

On Thu, Jun 23, 2016 at 6:04 PM, Jungtaek Lim <ka...@gmail.com> wrote:

> Could you share implementation of spout?
> In multi-lang user level functions shouldn't block, so heartbeat timeout
> will occur if your spout requests to Kafka and wait for response infinitely.
>
> - Jungtaek Lim (HeartSaVioR)
>
> 2016년 6월 23일 (목) 오후 7:25, cogumelosmaravilha <co...@sapo.pt>님이
> 작성:
>
>> Yes, but if you submit to Storm, it's ok, with or without an empty Kafka.
>>
>>
>> On 23-06-2016 10:40, ram kumar wrote:
>>
>> Yes, The same topology, when I run in single node cluster, it works. And
>> when there is no data to consume from kafka, it shows heartbeat timeout
>> error.
>>
>> Here, I am testing in multi node cluster and kafka server is in different
>> node
>>
>> On Thu, Jun 23, 2016 at 1:48 PM, cogumelosmaravilha <
>> <co...@sapo.pt> wrote:
>>
>>> Hi,
>>>
>>> It happens to me to, but only when my kafka is empy.
>>> I'm using Petrel because the generated jar file is really small like
>>> 300k Petrel vs 16M Streamparse.
>>>
>>>
>>> On 23-06-2016 08:05, ram kumar wrote:
>>>
>>> Hi,
>>>
>>>
>>> *Version: *
>>>     Storm : 0.10.0
>>>     Streamparse : 2.1.4
>>>
>>>
>>> I am running a storm topology with a python streamparse "sparse run".
>>>
>>> This topology stops executing in the middle and throw an exception
>>>
>>>
>>> 158031 [pool-37-thread-1] ERROR b.s.s.ShellSpout - Halting process:
>>>> ShellSpout died.
>>>> java.lang.RuntimeException: *subprocess heartbeat timeout*
>>>>     at
>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>>>> [storm-core-0.10.0.jar:0.10.0]
>>>>     at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>> [?:1.8.0_40]
>>>>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> [?:1.8.0_40]
>>>>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>>> 158036 [pool-37-thread-1] ERROR b.s.d.executor -
>>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>>     at
>>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>>>> [storm-core-0.10.0.jar:0.10.0]
>>>>     at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>> [?:1.8.0_40]
>>>>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> [?:1.8.0_40]
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> [?:1.8.0_40]
>>>>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>>>
>>>
>>>
>>> This occurs randomly
>>> I can't able to trace back to the problem
>>>
>>> Maybe if spout takes too long to process, then streamparse can't
>>> acknowledge the heartbeat in time
>>>
>>> changed "supervisor.worker.timeout.secs" from 30 to 600
>>>
>>> Still the topology breaks. Is there any other options here?
>>>
>>>
>>> Thanks,
>>> Ram.
>>>
>>>
>>>
>>
>>

Re: Heartbeat timeout with Python spout

Posted by Jungtaek Lim <ka...@gmail.com>.
Could you share implementation of spout?
In multi-lang user level functions shouldn't block, so heartbeat timeout
will occur if your spout requests to Kafka and wait for response infinitely.

- Jungtaek Lim (HeartSaVioR)

2016년 6월 23일 (목) 오후 7:25, cogumelosmaravilha <co...@sapo.pt>님이
작성:

> Yes, but if you submit to Storm, it's ok, with or without an empty Kafka.
>
>
> On 23-06-2016 10:40, ram kumar wrote:
>
> Yes, The same topology, when I run in single node cluster, it works. And
> when there is no data to consume from kafka, it shows heartbeat timeout
> error.
>
> Here, I am testing in multi node cluster and kafka server is in different
> node
>
> On Thu, Jun 23, 2016 at 1:48 PM, cogumelosmaravilha <
> cogumelosmaravilha@sapo.pt> wrote:
>
>> Hi,
>>
>> It happens to me to, but only when my kafka is empy.
>> I'm using Petrel because the generated jar file is really small like 300k
>> Petrel vs 16M Streamparse.
>>
>>
>> On 23-06-2016 08:05, ram kumar wrote:
>>
>> Hi,
>>
>>
>> *Version: *
>>     Storm : 0.10.0
>>     Streamparse : 2.1.4
>>
>>
>> I am running a storm topology with a python streamparse "sparse run".
>>
>> This topology stops executing in the middle and throw an exception
>>
>>
>> 158031 [pool-37-thread-1] ERROR b.s.s.ShellSpout - Halting process:
>>> ShellSpout died.
>>> java.lang.RuntimeException: *subprocess heartbeat timeout*
>>>     at
>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>>> [storm-core-0.10.0.jar:0.10.0]
>>>     at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> [?:1.8.0_40]
>>>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>> [?:1.8.0_40]
>>>     at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>> [?:1.8.0_40]
>>>     at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>> [?:1.8.0_40]
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> [?:1.8.0_40]
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> [?:1.8.0_40]
>>>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>> 158036 [pool-37-thread-1] ERROR b.s.d.executor -
>>> java.lang.RuntimeException: subprocess heartbeat timeout
>>>     at
>>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>>> [storm-core-0.10.0.jar:0.10.0]
>>>     at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> [?:1.8.0_40]
>>>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>> [?:1.8.0_40]
>>>     at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>> [?:1.8.0_40]
>>>     at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>> [?:1.8.0_40]
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> [?:1.8.0_40]
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> [?:1.8.0_40]
>>>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>>
>>
>>
>> This occurs randomly
>> I can't able to trace back to the problem
>>
>> Maybe if spout takes too long to process, then streamparse can't
>> acknowledge the heartbeat in time
>>
>> changed "supervisor.worker.timeout.secs" from 30 to 600
>>
>> Still the topology breaks. Is there any other options here?
>>
>>
>> Thanks,
>> Ram.
>>
>>
>>
>
>

Re: Heartbeat timeout with Python spout

Posted by cogumelosmaravilha <co...@sapo.pt>.
Yes, but if you submit to Storm, it's ok, with or without an empty Kafka.

On 23-06-2016 10:40, ram kumar wrote:
> Yes, The same topology, when I run in single node cluster, it works. 
> And when there is no data to consume from kafka, it shows heartbeat 
> timeout error.
>
> Here, I am testing in multi node cluster and kafka server is in 
> different node
>
> On Thu, Jun 23, 2016 at 1:48 PM, cogumelosmaravilha 
> <cogumelosmaravilha@sapo.pt <ma...@sapo.pt>> wrote:
>
>     Hi,
>
>     It happens to me to, but only when my kafka is empy.
>     I'm using Petrel because the generated jar file is really small
>     like 300k Petrel vs 16M Streamparse.
>
>
>     On 23-06-2016 08:05, ram kumar wrote:
>>     Hi,
>>     *
>>     *
>>     *Version:
>>     *
>>         Storm : 0.10.0
>>         Streamparse : 2.1.4
>>
>>
>>     I am running a storm topology with a python streamparse "sparse run".
>>     This topology stops executing in the middle and throw an exception
>>
>>         158031 [pool-37-thread-1] ERROR b.s.s.ShellSpout - Halting
>>         process: ShellSpout died.
>>         java.lang.RuntimeException: *subprocess heartbeat timeout*
>>             at
>>         backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>>         [storm-core-0.10.0.jar:0.10.0]
>>             at
>>         java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>         [?:1.8.0_40]
>>             at
>>         java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>         [?:1.8.0_40]
>>             at
>>         java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>         [?:1.8.0_40]
>>             at
>>         java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>         [?:1.8.0_40]
>>             at
>>         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>         [?:1.8.0_40]
>>             at
>>         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>         [?:1.8.0_40]
>>             at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>         158036 [pool-37-thread-1] ERROR b.s.d.executor -
>>         java.lang.RuntimeException: subprocess heartbeat timeout
>>             at
>>         backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>>         [storm-core-0.10.0.jar:0.10.0]
>>             at
>>         java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>         [?:1.8.0_40]
>>             at
>>         java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>         [?:1.8.0_40]
>>             at
>>         java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>         [?:1.8.0_40]
>>             at
>>         java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>         [?:1.8.0_40]
>>             at
>>         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>         [?:1.8.0_40]
>>             at
>>         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>         [?:1.8.0_40]
>>             at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>
>>
>>
>>     This occurs randomly
>>     I can't able to trace back to the problem
>>
>>     Maybe if spout takes too long to process, then streamparse can't
>>     acknowledge the heartbeat in time
>>
>>     changed "supervisor.worker.timeout.secs" from 30 to 600
>>
>>     Still the topology breaks. Is there any other options here?
>>
>>
>>     Thanks,
>>     Ram.
>
>


Re: Heartbeat timeout with Python spout

Posted by ram kumar <ra...@gmail.com>.
Yes, The same topology, when I run in single node cluster, it works. And
when there is no data to consume from kafka, it shows heartbeat timeout
error.

Here, I am testing in multi node cluster and kafka server is in different
node

On Thu, Jun 23, 2016 at 1:48 PM, cogumelosmaravilha <
cogumelosmaravilha@sapo.pt> wrote:

> Hi,
>
> It happens to me to, but only when my kafka is empy.
> I'm using Petrel because the generated jar file is really small like 300k
> Petrel vs 16M Streamparse.
>
>
> On 23-06-2016 08:05, ram kumar wrote:
>
> Hi,
>
>
> *Version: *
>     Storm : 0.10.0
>     Streamparse : 2.1.4
>
>
> I am running a storm topology with a python streamparse "sparse run".
>
> This topology stops executing in the middle and throw an exception
>
>
> 158031 [pool-37-thread-1] ERROR b.s.s.ShellSpout - Halting process:
>> ShellSpout died.
>> java.lang.RuntimeException: *subprocess heartbeat timeout*
>>     at
>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>> [storm-core-0.10.0.jar:0.10.0]
>>     at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> [?:1.8.0_40]
>>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>> [?:1.8.0_40]
>>     at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> [?:1.8.0_40]
>>     at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> [?:1.8.0_40]
>>     at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> [?:1.8.0_40]
>>     at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> [?:1.8.0_40]
>>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>> 158036 [pool-37-thread-1] ERROR b.s.d.executor -
>> java.lang.RuntimeException: subprocess heartbeat timeout
>>     at
>> backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>> [storm-core-0.10.0.jar:0.10.0]
>>     at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> [?:1.8.0_40]
>>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>> [?:1.8.0_40]
>>     at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> [?:1.8.0_40]
>>     at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> [?:1.8.0_40]
>>     at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> [?:1.8.0_40]
>>     at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> [?:1.8.0_40]
>>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>>
>
>
> This occurs randomly
> I can't able to trace back to the problem
>
> Maybe if spout takes too long to process, then streamparse can't
> acknowledge the heartbeat in time
>
> changed "supervisor.worker.timeout.secs" from 30 to 600
>
> Still the topology breaks. Is there any other options here?
>
>
> Thanks,
> Ram.
>
>
>

Re: Heartbeat timeout with Python spout

Posted by cogumelosmaravilha <co...@sapo.pt>.
Hi,

It happens to me to, but only when my kafka is empy.
I'm using Petrel because the generated jar file is really small like 
300k Petrel vs 16M Streamparse.

On 23-06-2016 08:05, ram kumar wrote:
> Hi,
> *
> *
> *Version:
> *
>     Storm : 0.10.0
>     Streamparse : 2.1.4
>
>
> I am running a storm topology with a python streamparse "sparse run".
> This topology stops executing in the middle and throw an exception
>
>     158031 [pool-37-thread-1] ERROR b.s.s.ShellSpout - Halting
>     process: ShellSpout died.
>     java.lang.RuntimeException: *subprocess heartbeat timeout*
>         at
>     backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>     [storm-core-0.10.0.jar:0.10.0]
>         at
>     java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     [?:1.8.0_40]
>         at
>     java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     [?:1.8.0_40]
>         at
>     java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     [?:1.8.0_40]
>         at
>     java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     [?:1.8.0_40]
>         at
>     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     [?:1.8.0_40]
>         at
>     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     [?:1.8.0_40]
>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>     158036 [pool-37-thread-1] ERROR b.s.d.executor -
>     java.lang.RuntimeException: subprocess heartbeat timeout
>         at
>     backtype.storm.spout.ShellSpout$SpoutHeartbeatTimerTask.run(ShellSpout.java:261)
>     [storm-core-0.10.0.jar:0.10.0]
>         at
>     java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     [?:1.8.0_40]
>         at
>     java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     [?:1.8.0_40]
>         at
>     java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     [?:1.8.0_40]
>         at
>     java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     [?:1.8.0_40]
>         at
>     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     [?:1.8.0_40]
>         at
>     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     [?:1.8.0_40]
>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_40]
>
>
>
> This occurs randomly
> I can't able to trace back to the problem
>
> Maybe if spout takes too long to process, then streamparse can't 
> acknowledge the heartbeat in time
>
> changed "supervisor.worker.timeout.secs" from 30 to 600
>
> Still the topology breaks. Is there any other options here?
>
>
> Thanks,
> Ram.