You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Andrey Yegorov <an...@gmail.com> on 2015/09/21 20:01:25 UTC

Re: Frozen topology (KafkaSpout + Multilang bolt)

Hi Alex,

Can you share how have you solved/worked around the problem?
I hit something similar and I would appreciate any suggestions on how to
deal with it.
Thank you beforehand.

----------
Andrey Yegorov

On Thu, Jul 23, 2015 at 4:43 AM, Alex Sobrino <al...@v5tech.es> wrote:

> Hi,
>
> We've got a pretty simple topology running with Storm 0.9.5 (tried also
> with 0.9.4 and 0.9.6-INCUBATING) in a 3 machine cluster:
>
> kafkaSpout (3) -----> processBolt (12)
>
> Some info:
> - kafkaSpout reads from a topic with 3 partitions and 2 replications
> - processBolt iterates throught the message and saves the results in
> MongoDB
> - processBolt is implemented in Python and has a storm.log("I'm doing
> something") just to add a simple debug message in the logs
> - The messages can be quite big (~25-40 MB) and are in JSON format
> - The kafka topic has a retention of 2 hours
> - We use the same ZooKeeper cluster to both Kafka and Storm
>
> The topology gets frozen after several hours (not days) running. We don't
> see any message in the logs... In fact, the periodic message from
> s.k.KafkaUtils and s.k.ZkCoordinator disapears. As you can imagine, the
> message from the Bolt also dissapears. Logs are copy/pasted further on. If
> we redeploy the topology everything starts to work again until it becomes
> frozen again.
>
>
>
> Our kafkaSpout config is:
>
> ZkHosts zkHosts = new ZkHosts("zkhost01:2181,zkhost02:2181,zkhost03:2181");
> SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "topic",
> "/topic/ourclientid", "ourclientid");
> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> kafkaConfig.fetchSizeBytes = 50*1024*1024;
> kafkaConfig.bufferSizeBytes = 50*1024*1024;
>
> We've also tried setting the following options
>
> kafkaConfig.forceFromStart = true;
> kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); //
> Also with kafka.api.OffsetRequest.LatestTime();
> kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
>
> Right now the topology is running without acking the messages since
> there's a bug in kafkaSpout with failed messages and deleted offsets in
> Kafka.
>
>
>
> This is what can be seen in the logs in one of the workers:
>
> 2015-07-23T12:37:38.008+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
> name:processBolt I'm doing something
> 2015-07-23T12:37:39.079+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
> name:processBolt I'm doing something
> 2015-07-23T12:37:51.013+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
> name:processBolt I'm doing something
> 2015-07-23T12:37:51.091+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
> name:processBolt I'm doing something
> 2015-07-23T12:38:02.684+0200 s.k.ZkCoordinator [INFO] Task [2/3]
> Refreshing partition manager connections
> 2015-07-23T12:38:02.687+0200 s.k.DynamicBrokersReader [INFO] Read
> partition info from zookeeper:
> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
> 2=kafka3:9092}}
> 2015-07-23T12:38:02.687+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
> [Partition{host=kafka2, partition=1}]
> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
> partition managers: []
> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
> partition managers: []
> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished
> refreshing
> 2015-07-23T12:38:09.012+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
> name:processBolt I'm doing something
> 2015-07-23T12:38:41.878+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
> name:processBolt I'm doing something
> 2015-07-23T12:39:02.688+0200 s.k.ZkCoordinator [INFO] Task [2/3]
> Refreshing partition manager connections
> 2015-07-23T12:39:02.691+0200 s.k.DynamicBrokersReader [INFO] Read
> partition info from zookeeper:
> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
> 2=kafka3:9092}}
> 2015-07-23T12:39:02.691+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
> [Partition{host=kafka2:9092, partition=1}]
> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
> partition managers: []
> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
> partition managers: []
> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished
> refreshing
> 2015-07-23T12:40:02.692+0200 s.k.ZkCoordinator [INFO] Task [2/3]
> Refreshing partition manager connections
> 2015-07-23T12:40:02.695+0200 s.k.DynamicBrokersReader [INFO] Read
> partition info from zookeeper:
> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
> 2=kafka3:9092}}
> 2015-07-23T12:40:02.695+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
> [Partition{host=kafka2:9092, partition=1}]
> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
> partition managers: []
> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
> partition managers: []
> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished
> refreshing
> 2015-07-23T12:41:02.696+0200 s.k.ZkCoordinator [INFO] Task [2/3]
> Refreshing partition manager connections
> 2015-07-23T12:41:02.699+0200 s.k.DynamicBrokersReader [INFO] Read
> partition info from zookeeper:
> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
> 2=kafka3:9092}}
> 2015-07-23T12:41:02.699+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
> [Partition{host=kafka2:9092, partition=1}]
> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
> partition managers: []
> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
> partition managers: []
> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished
> refreshing
> 2015-07-23T12:42:02.735+0200 s.k.ZkCoordinator [INFO] Task [2/3]
> Refreshing partition manager connections
> 2015-07-23T12:42:02.737+0200 s.k.DynamicBrokersReader [INFO] Read
> partition info from zookeeper:
> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
> 2=kafka3:9092}}
> 2015-07-23T12:42:02.737+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
> [Partition{host=kafka2:9092, partition=1}]
> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
> partition managers: []
> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
> partition managers: []
> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished
> refreshing
>
>
> and then it becomes frozen. Nothing is written into the nimbus log. We've
> checked the offsets in ZooKeeper and they're not updated:
>
>
> {"topology":{"id":"218e58a5-6bfb-4b32-ae89-f3afa19306e1","name":"our-topology"},"offset":12047144,"partition":1,"broker":{"host":"kafka2","port":9092},"topic":"topic"}
> cZxid = 0x100028958
> ctime = Wed Jul 01 12:22:36 CEST 2015
> mZxid = 0x100518527
> mtime = Thu Jul 23 12:42:41 CEST 2015
> pZxid = 0x100028958
> cversion = 0
> dataVersion = 446913
> aclVersion = 0
> ephemeralOwner = 0x0
> dataLength = 183
> numChildren = 0
>
>
>
> Any ideas of what we could be missing? Should we open a Jira Issue?
>
> Thanks!
>
> --
> Alex Sobrino Beltrán
> Registered Linux User #273657
>
> http://v5tech.es
>

Re: Frozen topology (KafkaSpout + Multilang bolt)

Posted by Abhishek Agarwal <ab...@gmail.com>.
In such situations, its better to take a jstack and see where things are
stuck.
@Andrey - Does your spout emit multiple tuples in single nextTuple call?

On Tue, Sep 22, 2015 at 11:35 PM, Jitendra Yadav <jeetuyadav200890@gmail.com
> wrote:

> We had faced similar kind for issue in past, after doing several days of
> research we found out that the topology was getting freeze  after 5-6
> minutes due to a code issue. We were trying to acknowledge the unanchored
> tuple in one of our bolt which actually doesn't make sense but this is kind
> of weird situation.
>
> Thanks
> Jitendra
>
> On Tue, Sep 22, 2015 at 8:34 AM, Andrey Yegorov <an...@gmail.com>
> wrote:
>
>> FWIW, I am not using multilang.
>>
>> Topology is written in java; I am using storm 0.9.4.
>>
>> ----------
>> Andrey Yegorov
>>
>> On Tue, Sep 22, 2015 at 4:20 AM, Alex Sobrino <al...@v5tech.es> wrote:
>>
>>> Hi guys,
>>>
>>> What we've seen so far is that it's *not* a KafkaSpout-only issue: it's
>>> related to how Storm and multilang protocol is implemented in Python.
>>>
>>> If the bolt's process() execution time is big enough, then the
>>> corresponding heartbeat tuples time out. That leaves you in a situation
>>> where your bolt is ready to process another tuple but Storm thinks it had
>>> some king of problem. If I remember correctly, Storm ""hangs"" waiting for
>>> the multilang process error being printed through the stderr output.
>>> That'll never happen, and thus the topology seems to be hung.
>>>
>>> Storm thinks the multilang process had a problem, is waiting for a
>>> stderr message but the multilang process is waiting for another tuple... I
>>> think this issue covers this problem:
>>> https://issues.apache.org/jira/browse/STORM-738
>>>
>>> What we've done in our projects is either:
>>> 1) Rewrite the Bolts implementation in Java
>>> 2) Fall back to Storm 0.9.2 where the multilang protocol didn't have to
>>> handle heartbeat tuples
>>>
>>> Hope this helps...
>>>
>>> On Tue, Sep 22, 2015 at 1:05 PM, Abhishek Agarwal <ab...@gmail.com>
>>> wrote:
>>>
>>>> Could this be the issue you guys are facing?
>>>> https://issues.apache.org/jira/browse/STORM-1027
>>>> FYI, above can happen for non-Kafka sources as well.
>>>>
>>>> On Mon, Sep 21, 2015 at 11:39 PM, Onur Yalazı <on...@8digits.com>
>>>> wrote:
>>>>
>>>>> I think we have an issue similar. We are using benstalkd as a message
>>>>> source so it's not kafka related in our case.
>>>>>
>>>>> We have normally 30MB/s traffic between nodes and some logs writing
>>>>> down a few durations of topology. Whenever the topology freezes, traffic
>>>>> comes down to 200KB/s. and complete latency drops drastically and fail
>>>>> counts zeros. We see a fraction of our duration logs and service stops
>>>>> working.
>>>>>
>>>>> Any ideas?
>>>>> On Sep 21, 2015 9:01 PM, "Andrey Yegorov" <an...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Alex,
>>>>>>
>>>>>> Can you share how have you solved/worked around the problem?
>>>>>> I hit something similar and I would appreciate any suggestions on how
>>>>>> to deal with it.
>>>>>> Thank you beforehand.
>>>>>>
>>>>>> ----------
>>>>>> Andrey Yegorov
>>>>>>
>>>>>> On Thu, Jul 23, 2015 at 4:43 AM, Alex Sobrino <al...@v5tech.es> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> We've got a pretty simple topology running with Storm 0.9.5 (tried
>>>>>>> also with 0.9.4 and 0.9.6-INCUBATING) in a 3 machine cluster:
>>>>>>>
>>>>>>> kafkaSpout (3) -----> processBolt (12)
>>>>>>>
>>>>>>> Some info:
>>>>>>> - kafkaSpout reads from a topic with 3 partitions and 2 replications
>>>>>>> - processBolt iterates throught the message and saves the results
>>>>>>> in MongoDB
>>>>>>> - processBolt is implemented in Python and has a storm.log("I'm
>>>>>>> doing something") just to add a simple debug message in the logs
>>>>>>> - The messages can be quite big (~25-40 MB) and are in JSON format
>>>>>>> - The kafka topic has a retention of 2 hours
>>>>>>> - We use the same ZooKeeper cluster to both Kafka and Storm
>>>>>>>
>>>>>>> The topology gets frozen after several hours (not days) running. We
>>>>>>> don't see any message in the logs... In fact, the periodic message from
>>>>>>> s.k.KafkaUtils and s.k.ZkCoordinator disapears. As you can imagine,
>>>>>>> the message from the Bolt also dissapears. Logs are copy/pasted further on.
>>>>>>> If we redeploy the topology everything starts to work again until it
>>>>>>> becomes frozen again.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Our kafkaSpout config is:
>>>>>>>
>>>>>>> ZkHosts zkHosts = new
>>>>>>> ZkHosts("zkhost01:2181,zkhost02:2181,zkhost03:2181");
>>>>>>> SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "topic",
>>>>>>> "/topic/ourclientid", "ourclientid");
>>>>>>> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>>>>> kafkaConfig.fetchSizeBytes = 50*1024*1024;
>>>>>>> kafkaConfig.bufferSizeBytes = 50*1024*1024;
>>>>>>>
>>>>>>> We've also tried setting the following options
>>>>>>>
>>>>>>> kafkaConfig.forceFromStart = true;
>>>>>>> kafkaConfig.startOffsetTime =
>>>>>>> kafka.api.OffsetRequest.EarliestTime(); // Also with
>>>>>>> kafka.api.OffsetRequest.LatestTime();
>>>>>>> kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
>>>>>>>
>>>>>>> Right now the topology is running without acking the messages since
>>>>>>> there's a bug in kafkaSpout with failed messages and deleted
>>>>>>> offsets in Kafka.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> This is what can be seen in the logs in one of the workers:
>>>>>>>
>>>>>>> 2015-07-23T12:37:38.008+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>>>> pid:28364, name:processBolt I'm doing something
>>>>>>> 2015-07-23T12:37:39.079+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>>>> pid:28364, name:processBolt I'm doing something
>>>>>>> 2015-07-23T12:37:51.013+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>>>> pid:28364, name:processBolt I'm doing something
>>>>>>> 2015-07-23T12:37:51.091+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>>>> pid:28364, name:processBolt I'm doing something
>>>>>>> 2015-07-23T12:38:02.684+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Refreshing partition manager connections
>>>>>>> 2015-07-23T12:38:02.687+0200 s.k.DynamicBrokersReader [INFO] Read
>>>>>>> partition info from zookeeper:
>>>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>>>>> 2=kafka3:9092}}
>>>>>>> 2015-07-23T12:38:02.687+0200 s.k.KafkaUtils [INFO] Task [2/3]
>>>>>>> assigned [Partition{host=kafka2, partition=1}]
>>>>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Deleted partition managers: []
>>>>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>>>>> partition managers: []
>>>>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Finished refreshing
>>>>>>> 2015-07-23T12:38:09.012+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>>>> pid:28364, name:processBolt I'm doing something
>>>>>>> 2015-07-23T12:38:41.878+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>>>> pid:28364, name:processBolt I'm doing something
>>>>>>> 2015-07-23T12:39:02.688+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Refreshing partition manager connections
>>>>>>> 2015-07-23T12:39:02.691+0200 s.k.DynamicBrokersReader [INFO] Read
>>>>>>> partition info from zookeeper:
>>>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>>>>> 2=kafka3:9092}}
>>>>>>> 2015-07-23T12:39:02.691+0200 s.k.KafkaUtils [INFO] Task [2/3]
>>>>>>> assigned [Partition{host=kafka2:9092, partition=1}]
>>>>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Deleted partition managers: []
>>>>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>>>>> partition managers: []
>>>>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Finished refreshing
>>>>>>> 2015-07-23T12:40:02.692+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Refreshing partition manager connections
>>>>>>> 2015-07-23T12:40:02.695+0200 s.k.DynamicBrokersReader [INFO] Read
>>>>>>> partition info from zookeeper:
>>>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>>>>> 2=kafka3:9092}}
>>>>>>> 2015-07-23T12:40:02.695+0200 s.k.KafkaUtils [INFO] Task [2/3]
>>>>>>> assigned [Partition{host=kafka2:9092, partition=1}]
>>>>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Deleted partition managers: []
>>>>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>>>>> partition managers: []
>>>>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Finished refreshing
>>>>>>> 2015-07-23T12:41:02.696+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Refreshing partition manager connections
>>>>>>> 2015-07-23T12:41:02.699+0200 s.k.DynamicBrokersReader [INFO] Read
>>>>>>> partition info from zookeeper:
>>>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>>>>> 2=kafka3:9092}}
>>>>>>> 2015-07-23T12:41:02.699+0200 s.k.KafkaUtils [INFO] Task [2/3]
>>>>>>> assigned [Partition{host=kafka2:9092, partition=1}]
>>>>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Deleted partition managers: []
>>>>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>>>>> partition managers: []
>>>>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Finished refreshing
>>>>>>> 2015-07-23T12:42:02.735+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Refreshing partition manager connections
>>>>>>> 2015-07-23T12:42:02.737+0200 s.k.DynamicBrokersReader [INFO] Read
>>>>>>> partition info from zookeeper:
>>>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>>>>> 2=kafka3:9092}}
>>>>>>> 2015-07-23T12:42:02.737+0200 s.k.KafkaUtils [INFO] Task [2/3]
>>>>>>> assigned [Partition{host=kafka2:9092, partition=1}]
>>>>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Deleted partition managers: []
>>>>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>>>>> partition managers: []
>>>>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>>> Finished refreshing
>>>>>>>
>>>>>>>
>>>>>>> and then it becomes frozen. Nothing is written into the nimbus log.
>>>>>>> We've checked the offsets in ZooKeeper and they're not updated:
>>>>>>>
>>>>>>>
>>>>>>> {"topology":{"id":"218e58a5-6bfb-4b32-ae89-f3afa19306e1","name":"our-topology"},"offset":12047144,"partition":1,"broker":{"host":"kafka2","port":9092},"topic":"topic"}
>>>>>>> cZxid = 0x100028958
>>>>>>> ctime = Wed Jul 01 12:22:36 CEST 2015
>>>>>>> mZxid = 0x100518527
>>>>>>> mtime = Thu Jul 23 12:42:41 CEST 2015
>>>>>>> pZxid = 0x100028958
>>>>>>> cversion = 0
>>>>>>> dataVersion = 446913
>>>>>>> aclVersion = 0
>>>>>>> ephemeralOwner = 0x0
>>>>>>> dataLength = 183
>>>>>>> numChildren = 0
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Any ideas of what we could be missing? Should we open a Jira Issue?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> --
>>>>>>> Alex Sobrino Beltrán
>>>>>>> Registered Linux User #273657
>>>>>>>
>>>>>>> http://v5tech.es
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>>
>>>> --
>>>> Regards,
>>>> Abhishek Agarwal
>>>>
>>>>
>>>
>>>
>>> --
>>> Alex Sobrino Beltrán
>>> Registered Linux User #273657
>>>
>>> http://v5tech.es
>>>
>>
>>
>


-- 
Regards,
Abhishek Agarwal

Re: Frozen topology (KafkaSpout + Multilang bolt)

Posted by Jitendra Yadav <je...@gmail.com>.
We had faced similar kind for issue in past, after doing several days of
research we found out that the topology was getting freeze  after 5-6
minutes due to a code issue. We were trying to acknowledge the unanchored
tuple in one of our bolt which actually doesn't make sense but this is kind
of weird situation.

Thanks
Jitendra

On Tue, Sep 22, 2015 at 8:34 AM, Andrey Yegorov <an...@gmail.com>
wrote:

> FWIW, I am not using multilang.
>
> Topology is written in java; I am using storm 0.9.4.
>
> ----------
> Andrey Yegorov
>
> On Tue, Sep 22, 2015 at 4:20 AM, Alex Sobrino <al...@v5tech.es> wrote:
>
>> Hi guys,
>>
>> What we've seen so far is that it's *not* a KafkaSpout-only issue: it's
>> related to how Storm and multilang protocol is implemented in Python.
>>
>> If the bolt's process() execution time is big enough, then the
>> corresponding heartbeat tuples time out. That leaves you in a situation
>> where your bolt is ready to process another tuple but Storm thinks it had
>> some king of problem. If I remember correctly, Storm ""hangs"" waiting for
>> the multilang process error being printed through the stderr output.
>> That'll never happen, and thus the topology seems to be hung.
>>
>> Storm thinks the multilang process had a problem, is waiting for a stderr
>> message but the multilang process is waiting for another tuple... I think
>> this issue covers this problem:
>> https://issues.apache.org/jira/browse/STORM-738
>>
>> What we've done in our projects is either:
>> 1) Rewrite the Bolts implementation in Java
>> 2) Fall back to Storm 0.9.2 where the multilang protocol didn't have to
>> handle heartbeat tuples
>>
>> Hope this helps...
>>
>> On Tue, Sep 22, 2015 at 1:05 PM, Abhishek Agarwal <ab...@gmail.com>
>> wrote:
>>
>>> Could this be the issue you guys are facing?
>>> https://issues.apache.org/jira/browse/STORM-1027
>>> FYI, above can happen for non-Kafka sources as well.
>>>
>>> On Mon, Sep 21, 2015 at 11:39 PM, Onur Yalazı <on...@8digits.com>
>>> wrote:
>>>
>>>> I think we have an issue similar. We are using benstalkd as a message
>>>> source so it's not kafka related in our case.
>>>>
>>>> We have normally 30MB/s traffic between nodes and some logs writing
>>>> down a few durations of topology. Whenever the topology freezes, traffic
>>>> comes down to 200KB/s. and complete latency drops drastically and fail
>>>> counts zeros. We see a fraction of our duration logs and service stops
>>>> working.
>>>>
>>>> Any ideas?
>>>> On Sep 21, 2015 9:01 PM, "Andrey Yegorov" <an...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Alex,
>>>>>
>>>>> Can you share how have you solved/worked around the problem?
>>>>> I hit something similar and I would appreciate any suggestions on how
>>>>> to deal with it.
>>>>> Thank you beforehand.
>>>>>
>>>>> ----------
>>>>> Andrey Yegorov
>>>>>
>>>>> On Thu, Jul 23, 2015 at 4:43 AM, Alex Sobrino <al...@v5tech.es> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We've got a pretty simple topology running with Storm 0.9.5 (tried
>>>>>> also with 0.9.4 and 0.9.6-INCUBATING) in a 3 machine cluster:
>>>>>>
>>>>>> kafkaSpout (3) -----> processBolt (12)
>>>>>>
>>>>>> Some info:
>>>>>> - kafkaSpout reads from a topic with 3 partitions and 2 replications
>>>>>> - processBolt iterates throught the message and saves the results in
>>>>>> MongoDB
>>>>>> - processBolt is implemented in Python and has a storm.log("I'm
>>>>>> doing something") just to add a simple debug message in the logs
>>>>>> - The messages can be quite big (~25-40 MB) and are in JSON format
>>>>>> - The kafka topic has a retention of 2 hours
>>>>>> - We use the same ZooKeeper cluster to both Kafka and Storm
>>>>>>
>>>>>> The topology gets frozen after several hours (not days) running. We
>>>>>> don't see any message in the logs... In fact, the periodic message from
>>>>>> s.k.KafkaUtils and s.k.ZkCoordinator disapears. As you can imagine,
>>>>>> the message from the Bolt also dissapears. Logs are copy/pasted further on.
>>>>>> If we redeploy the topology everything starts to work again until it
>>>>>> becomes frozen again.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Our kafkaSpout config is:
>>>>>>
>>>>>> ZkHosts zkHosts = new
>>>>>> ZkHosts("zkhost01:2181,zkhost02:2181,zkhost03:2181");
>>>>>> SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "topic",
>>>>>> "/topic/ourclientid", "ourclientid");
>>>>>> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>>>> kafkaConfig.fetchSizeBytes = 50*1024*1024;
>>>>>> kafkaConfig.bufferSizeBytes = 50*1024*1024;
>>>>>>
>>>>>> We've also tried setting the following options
>>>>>>
>>>>>> kafkaConfig.forceFromStart = true;
>>>>>> kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
>>>>>> // Also with kafka.api.OffsetRequest.LatestTime();
>>>>>> kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
>>>>>>
>>>>>> Right now the topology is running without acking the messages since
>>>>>> there's a bug in kafkaSpout with failed messages and deleted offsets
>>>>>> in Kafka.
>>>>>>
>>>>>>
>>>>>>
>>>>>> This is what can be seen in the logs in one of the workers:
>>>>>>
>>>>>> 2015-07-23T12:37:38.008+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>>> pid:28364, name:processBolt I'm doing something
>>>>>> 2015-07-23T12:37:39.079+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>>> pid:28364, name:processBolt I'm doing something
>>>>>> 2015-07-23T12:37:51.013+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>>> pid:28364, name:processBolt I'm doing something
>>>>>> 2015-07-23T12:37:51.091+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>>> pid:28364, name:processBolt I'm doing something
>>>>>> 2015-07-23T12:38:02.684+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>> Refreshing partition manager connections
>>>>>> 2015-07-23T12:38:02.687+0200 s.k.DynamicBrokersReader [INFO] Read
>>>>>> partition info from zookeeper:
>>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>>>> 2=kafka3:9092}}
>>>>>> 2015-07-23T12:38:02.687+0200 s.k.KafkaUtils [INFO] Task [2/3]
>>>>>> assigned [Partition{host=kafka2, partition=1}]
>>>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>> Deleted partition managers: []
>>>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>>>> partition managers: []
>>>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>> Finished refreshing
>>>>>> 2015-07-23T12:38:09.012+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>>> pid:28364, name:processBolt I'm doing something
>>>>>> 2015-07-23T12:38:41.878+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>>> pid:28364, name:processBolt I'm doing something
>>>>>> 2015-07-23T12:39:02.688+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>> Refreshing partition manager connections
>>>>>> 2015-07-23T12:39:02.691+0200 s.k.DynamicBrokersReader [INFO] Read
>>>>>> partition info from zookeeper:
>>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>>>> 2=kafka3:9092}}
>>>>>> 2015-07-23T12:39:02.691+0200 s.k.KafkaUtils [INFO] Task [2/3]
>>>>>> assigned [Partition{host=kafka2:9092, partition=1}]
>>>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>> Deleted partition managers: []
>>>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>>>> partition managers: []
>>>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>> Finished refreshing
>>>>>> 2015-07-23T12:40:02.692+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>> Refreshing partition manager connections
>>>>>> 2015-07-23T12:40:02.695+0200 s.k.DynamicBrokersReader [INFO] Read
>>>>>> partition info from zookeeper:
>>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>>>> 2=kafka3:9092}}
>>>>>> 2015-07-23T12:40:02.695+0200 s.k.KafkaUtils [INFO] Task [2/3]
>>>>>> assigned [Partition{host=kafka2:9092, partition=1}]
>>>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>> Deleted partition managers: []
>>>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>>>> partition managers: []
>>>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>> Finished refreshing
>>>>>> 2015-07-23T12:41:02.696+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>> Refreshing partition manager connections
>>>>>> 2015-07-23T12:41:02.699+0200 s.k.DynamicBrokersReader [INFO] Read
>>>>>> partition info from zookeeper:
>>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>>>> 2=kafka3:9092}}
>>>>>> 2015-07-23T12:41:02.699+0200 s.k.KafkaUtils [INFO] Task [2/3]
>>>>>> assigned [Partition{host=kafka2:9092, partition=1}]
>>>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>> Deleted partition managers: []
>>>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>>>> partition managers: []
>>>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>> Finished refreshing
>>>>>> 2015-07-23T12:42:02.735+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>> Refreshing partition manager connections
>>>>>> 2015-07-23T12:42:02.737+0200 s.k.DynamicBrokersReader [INFO] Read
>>>>>> partition info from zookeeper:
>>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>>>> 2=kafka3:9092}}
>>>>>> 2015-07-23T12:42:02.737+0200 s.k.KafkaUtils [INFO] Task [2/3]
>>>>>> assigned [Partition{host=kafka2:9092, partition=1}]
>>>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>> Deleted partition managers: []
>>>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>>>> partition managers: []
>>>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>>> Finished refreshing
>>>>>>
>>>>>>
>>>>>> and then it becomes frozen. Nothing is written into the nimbus log.
>>>>>> We've checked the offsets in ZooKeeper and they're not updated:
>>>>>>
>>>>>>
>>>>>> {"topology":{"id":"218e58a5-6bfb-4b32-ae89-f3afa19306e1","name":"our-topology"},"offset":12047144,"partition":1,"broker":{"host":"kafka2","port":9092},"topic":"topic"}
>>>>>> cZxid = 0x100028958
>>>>>> ctime = Wed Jul 01 12:22:36 CEST 2015
>>>>>> mZxid = 0x100518527
>>>>>> mtime = Thu Jul 23 12:42:41 CEST 2015
>>>>>> pZxid = 0x100028958
>>>>>> cversion = 0
>>>>>> dataVersion = 446913
>>>>>> aclVersion = 0
>>>>>> ephemeralOwner = 0x0
>>>>>> dataLength = 183
>>>>>> numChildren = 0
>>>>>>
>>>>>>
>>>>>>
>>>>>> Any ideas of what we could be missing? Should we open a Jira Issue?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> --
>>>>>> Alex Sobrino Beltrán
>>>>>> Registered Linux User #273657
>>>>>>
>>>>>> http://v5tech.es
>>>>>>
>>>>>
>>>>>
>>>
>>>
>>> --
>>> Regards,
>>> Abhishek Agarwal
>>>
>>>
>>
>>
>> --
>> Alex Sobrino Beltrán
>> Registered Linux User #273657
>>
>> http://v5tech.es
>>
>
>

Re: Frozen topology (KafkaSpout + Multilang bolt)

Posted by Andrey Yegorov <an...@gmail.com>.
FWIW, I am not using multilang.

Topology is written in java; I am using storm 0.9.4.

----------
Andrey Yegorov

On Tue, Sep 22, 2015 at 4:20 AM, Alex Sobrino <al...@v5tech.es> wrote:

> Hi guys,
>
> What we've seen so far is that it's *not* a KafkaSpout-only issue: it's
> related to how Storm and multilang protocol is implemented in Python.
>
> If the bolt's process() execution time is big enough, then the
> corresponding heartbeat tuples time out. That leaves you in a situation
> where your bolt is ready to process another tuple but Storm thinks it had
> some king of problem. If I remember correctly, Storm ""hangs"" waiting for
> the multilang process error being printed through the stderr output.
> That'll never happen, and thus the topology seems to be hung.
>
> Storm thinks the multilang process had a problem, is waiting for a stderr
> message but the multilang process is waiting for another tuple... I think
> this issue covers this problem:
> https://issues.apache.org/jira/browse/STORM-738
>
> What we've done in our projects is either:
> 1) Rewrite the Bolts implementation in Java
> 2) Fall back to Storm 0.9.2 where the multilang protocol didn't have to
> handle heartbeat tuples
>
> Hope this helps...
>
> On Tue, Sep 22, 2015 at 1:05 PM, Abhishek Agarwal <ab...@gmail.com>
> wrote:
>
>> Could this be the issue you guys are facing?
>> https://issues.apache.org/jira/browse/STORM-1027
>> FYI, above can happen for non-Kafka sources as well.
>>
>> On Mon, Sep 21, 2015 at 11:39 PM, Onur Yalazı <on...@8digits.com>
>> wrote:
>>
>>> I think we have an issue similar. We are using benstalkd as a message
>>> source so it's not kafka related in our case.
>>>
>>> We have normally 30MB/s traffic between nodes and some logs writing down
>>> a few durations of topology. Whenever the topology freezes, traffic comes
>>> down to 200KB/s. and complete latency drops drastically and fail counts
>>> zeros. We see a fraction of our duration logs and service stops working.
>>>
>>> Any ideas?
>>> On Sep 21, 2015 9:01 PM, "Andrey Yegorov" <an...@gmail.com>
>>> wrote:
>>>
>>>> Hi Alex,
>>>>
>>>> Can you share how have you solved/worked around the problem?
>>>> I hit something similar and I would appreciate any suggestions on how
>>>> to deal with it.
>>>> Thank you beforehand.
>>>>
>>>> ----------
>>>> Andrey Yegorov
>>>>
>>>> On Thu, Jul 23, 2015 at 4:43 AM, Alex Sobrino <al...@v5tech.es> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We've got a pretty simple topology running with Storm 0.9.5 (tried
>>>>> also with 0.9.4 and 0.9.6-INCUBATING) in a 3 machine cluster:
>>>>>
>>>>> kafkaSpout (3) -----> processBolt (12)
>>>>>
>>>>> Some info:
>>>>> - kafkaSpout reads from a topic with 3 partitions and 2 replications
>>>>> - processBolt iterates throught the message and saves the results in
>>>>> MongoDB
>>>>> - processBolt is implemented in Python and has a storm.log("I'm doing
>>>>> something") just to add a simple debug message in the logs
>>>>> - The messages can be quite big (~25-40 MB) and are in JSON format
>>>>> - The kafka topic has a retention of 2 hours
>>>>> - We use the same ZooKeeper cluster to both Kafka and Storm
>>>>>
>>>>> The topology gets frozen after several hours (not days) running. We
>>>>> don't see any message in the logs... In fact, the periodic message from
>>>>> s.k.KafkaUtils and s.k.ZkCoordinator disapears. As you can imagine,
>>>>> the message from the Bolt also dissapears. Logs are copy/pasted further on.
>>>>> If we redeploy the topology everything starts to work again until it
>>>>> becomes frozen again.
>>>>>
>>>>>
>>>>>
>>>>> Our kafkaSpout config is:
>>>>>
>>>>> ZkHosts zkHosts = new
>>>>> ZkHosts("zkhost01:2181,zkhost02:2181,zkhost03:2181");
>>>>> SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "topic",
>>>>> "/topic/ourclientid", "ourclientid");
>>>>> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>>> kafkaConfig.fetchSizeBytes = 50*1024*1024;
>>>>> kafkaConfig.bufferSizeBytes = 50*1024*1024;
>>>>>
>>>>> We've also tried setting the following options
>>>>>
>>>>> kafkaConfig.forceFromStart = true;
>>>>> kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
>>>>> // Also with kafka.api.OffsetRequest.LatestTime();
>>>>> kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
>>>>>
>>>>> Right now the topology is running without acking the messages since
>>>>> there's a bug in kafkaSpout with failed messages and deleted offsets
>>>>> in Kafka.
>>>>>
>>>>>
>>>>>
>>>>> This is what can be seen in the logs in one of the workers:
>>>>>
>>>>> 2015-07-23T12:37:38.008+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>> pid:28364, name:processBolt I'm doing something
>>>>> 2015-07-23T12:37:39.079+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>> pid:28364, name:processBolt I'm doing something
>>>>> 2015-07-23T12:37:51.013+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>> pid:28364, name:processBolt I'm doing something
>>>>> 2015-07-23T12:37:51.091+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>> pid:28364, name:processBolt I'm doing something
>>>>> 2015-07-23T12:38:02.684+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>> Refreshing partition manager connections
>>>>> 2015-07-23T12:38:02.687+0200 s.k.DynamicBrokersReader [INFO] Read
>>>>> partition info from zookeeper:
>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>>> 2=kafka3:9092}}
>>>>> 2015-07-23T12:38:02.687+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>>>>> [Partition{host=kafka2, partition=1}]
>>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>> Deleted partition managers: []
>>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>>> partition managers: []
>>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>> Finished refreshing
>>>>> 2015-07-23T12:38:09.012+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>> pid:28364, name:processBolt I'm doing something
>>>>> 2015-07-23T12:38:41.878+0200 b.s.t.ShellBolt [INFO] ShellLog
>>>>> pid:28364, name:processBolt I'm doing something
>>>>> 2015-07-23T12:39:02.688+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>> Refreshing partition manager connections
>>>>> 2015-07-23T12:39:02.691+0200 s.k.DynamicBrokersReader [INFO] Read
>>>>> partition info from zookeeper:
>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>>> 2=kafka3:9092}}
>>>>> 2015-07-23T12:39:02.691+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>>>>> [Partition{host=kafka2:9092, partition=1}]
>>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>> Deleted partition managers: []
>>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>>> partition managers: []
>>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>> Finished refreshing
>>>>> 2015-07-23T12:40:02.692+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>> Refreshing partition manager connections
>>>>> 2015-07-23T12:40:02.695+0200 s.k.DynamicBrokersReader [INFO] Read
>>>>> partition info from zookeeper:
>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>>> 2=kafka3:9092}}
>>>>> 2015-07-23T12:40:02.695+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>>>>> [Partition{host=kafka2:9092, partition=1}]
>>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>> Deleted partition managers: []
>>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>>> partition managers: []
>>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>> Finished refreshing
>>>>> 2015-07-23T12:41:02.696+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>> Refreshing partition manager connections
>>>>> 2015-07-23T12:41:02.699+0200 s.k.DynamicBrokersReader [INFO] Read
>>>>> partition info from zookeeper:
>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>>> 2=kafka3:9092}}
>>>>> 2015-07-23T12:41:02.699+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>>>>> [Partition{host=kafka2:9092, partition=1}]
>>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>> Deleted partition managers: []
>>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>>> partition managers: []
>>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>> Finished refreshing
>>>>> 2015-07-23T12:42:02.735+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>> Refreshing partition manager connections
>>>>> 2015-07-23T12:42:02.737+0200 s.k.DynamicBrokersReader [INFO] Read
>>>>> partition info from zookeeper:
>>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>>> 2=kafka3:9092}}
>>>>> 2015-07-23T12:42:02.737+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>>>>> [Partition{host=kafka2:9092, partition=1}]
>>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>> Deleted partition managers: []
>>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>>> partition managers: []
>>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>>> Finished refreshing
>>>>>
>>>>>
>>>>> and then it becomes frozen. Nothing is written into the nimbus log.
>>>>> We've checked the offsets in ZooKeeper and they're not updated:
>>>>>
>>>>>
>>>>> {"topology":{"id":"218e58a5-6bfb-4b32-ae89-f3afa19306e1","name":"our-topology"},"offset":12047144,"partition":1,"broker":{"host":"kafka2","port":9092},"topic":"topic"}
>>>>> cZxid = 0x100028958
>>>>> ctime = Wed Jul 01 12:22:36 CEST 2015
>>>>> mZxid = 0x100518527
>>>>> mtime = Thu Jul 23 12:42:41 CEST 2015
>>>>> pZxid = 0x100028958
>>>>> cversion = 0
>>>>> dataVersion = 446913
>>>>> aclVersion = 0
>>>>> ephemeralOwner = 0x0
>>>>> dataLength = 183
>>>>> numChildren = 0
>>>>>
>>>>>
>>>>>
>>>>> Any ideas of what we could be missing? Should we open a Jira Issue?
>>>>>
>>>>> Thanks!
>>>>>
>>>>> --
>>>>> Alex Sobrino Beltrán
>>>>> Registered Linux User #273657
>>>>>
>>>>> http://v5tech.es
>>>>>
>>>>
>>>>
>>
>>
>> --
>> Regards,
>> Abhishek Agarwal
>>
>>
>
>
> --
> Alex Sobrino Beltrán
> Registered Linux User #273657
>
> http://v5tech.es
>

Re: Frozen topology (KafkaSpout + Multilang bolt)

Posted by Alex Sobrino <al...@v5tech.es>.
Hi guys,

What we've seen so far is that it's *not* a KafkaSpout-only issue: it's
related to how Storm and multilang protocol is implemented in Python.

If the bolt's process() execution time is big enough, then the
corresponding heartbeat tuples time out. That leaves you in a situation
where your bolt is ready to process another tuple but Storm thinks it had
some king of problem. If I remember correctly, Storm ""hangs"" waiting for
the multilang process error being printed through the stderr output.
That'll never happen, and thus the topology seems to be hung.

Storm thinks the multilang process had a problem, is waiting for a stderr
message but the multilang process is waiting for another tuple... I think
this issue covers this problem:
https://issues.apache.org/jira/browse/STORM-738

What we've done in our projects is either:
1) Rewrite the Bolts implementation in Java
2) Fall back to Storm 0.9.2 where the multilang protocol didn't have to
handle heartbeat tuples

Hope this helps...

On Tue, Sep 22, 2015 at 1:05 PM, Abhishek Agarwal <ab...@gmail.com>
wrote:

> Could this be the issue you guys are facing?
> https://issues.apache.org/jira/browse/STORM-1027
> FYI, above can happen for non-Kafka sources as well.
>
> On Mon, Sep 21, 2015 at 11:39 PM, Onur Yalazı <on...@8digits.com>
> wrote:
>
>> I think we have an issue similar. We are using benstalkd as a message
>> source so it's not kafka related in our case.
>>
>> We have normally 30MB/s traffic between nodes and some logs writing down
>> a few durations of topology. Whenever the topology freezes, traffic comes
>> down to 200KB/s. and complete latency drops drastically and fail counts
>> zeros. We see a fraction of our duration logs and service stops working.
>>
>> Any ideas?
>> On Sep 21, 2015 9:01 PM, "Andrey Yegorov" <an...@gmail.com>
>> wrote:
>>
>>> Hi Alex,
>>>
>>> Can you share how have you solved/worked around the problem?
>>> I hit something similar and I would appreciate any suggestions on how to
>>> deal with it.
>>> Thank you beforehand.
>>>
>>> ----------
>>> Andrey Yegorov
>>>
>>> On Thu, Jul 23, 2015 at 4:43 AM, Alex Sobrino <al...@v5tech.es> wrote:
>>>
>>>> Hi,
>>>>
>>>> We've got a pretty simple topology running with Storm 0.9.5 (tried also
>>>> with 0.9.4 and 0.9.6-INCUBATING) in a 3 machine cluster:
>>>>
>>>> kafkaSpout (3) -----> processBolt (12)
>>>>
>>>> Some info:
>>>> - kafkaSpout reads from a topic with 3 partitions and 2 replications
>>>> - processBolt iterates throught the message and saves the results in
>>>> MongoDB
>>>> - processBolt is implemented in Python and has a storm.log("I'm doing
>>>> something") just to add a simple debug message in the logs
>>>> - The messages can be quite big (~25-40 MB) and are in JSON format
>>>> - The kafka topic has a retention of 2 hours
>>>> - We use the same ZooKeeper cluster to both Kafka and Storm
>>>>
>>>> The topology gets frozen after several hours (not days) running. We
>>>> don't see any message in the logs... In fact, the periodic message from
>>>> s.k.KafkaUtils and s.k.ZkCoordinator disapears. As you can imagine,
>>>> the message from the Bolt also dissapears. Logs are copy/pasted further on.
>>>> If we redeploy the topology everything starts to work again until it
>>>> becomes frozen again.
>>>>
>>>>
>>>>
>>>> Our kafkaSpout config is:
>>>>
>>>> ZkHosts zkHosts = new
>>>> ZkHosts("zkhost01:2181,zkhost02:2181,zkhost03:2181");
>>>> SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "topic",
>>>> "/topic/ourclientid", "ourclientid");
>>>> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>> kafkaConfig.fetchSizeBytes = 50*1024*1024;
>>>> kafkaConfig.bufferSizeBytes = 50*1024*1024;
>>>>
>>>> We've also tried setting the following options
>>>>
>>>> kafkaConfig.forceFromStart = true;
>>>> kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
>>>> // Also with kafka.api.OffsetRequest.LatestTime();
>>>> kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
>>>>
>>>> Right now the topology is running without acking the messages since
>>>> there's a bug in kafkaSpout with failed messages and deleted offsets
>>>> in Kafka.
>>>>
>>>>
>>>>
>>>> This is what can be seen in the logs in one of the workers:
>>>>
>>>> 2015-07-23T12:37:38.008+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>>>> name:processBolt I'm doing something
>>>> 2015-07-23T12:37:39.079+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>>>> name:processBolt I'm doing something
>>>> 2015-07-23T12:37:51.013+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>>>> name:processBolt I'm doing something
>>>> 2015-07-23T12:37:51.091+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>>>> name:processBolt I'm doing something
>>>> 2015-07-23T12:38:02.684+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>> Refreshing partition manager connections
>>>> 2015-07-23T12:38:02.687+0200 s.k.DynamicBrokersReader [INFO] Read
>>>> partition info from zookeeper:
>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>> 2=kafka3:9092}}
>>>> 2015-07-23T12:38:02.687+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>>>> [Partition{host=kafka2, partition=1}]
>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>> Deleted partition managers: []
>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>> partition managers: []
>>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>> Finished refreshing
>>>> 2015-07-23T12:38:09.012+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>>>> name:processBolt I'm doing something
>>>> 2015-07-23T12:38:41.878+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>>>> name:processBolt I'm doing something
>>>> 2015-07-23T12:39:02.688+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>> Refreshing partition manager connections
>>>> 2015-07-23T12:39:02.691+0200 s.k.DynamicBrokersReader [INFO] Read
>>>> partition info from zookeeper:
>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>> 2=kafka3:9092}}
>>>> 2015-07-23T12:39:02.691+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>>>> [Partition{host=kafka2:9092, partition=1}]
>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>> Deleted partition managers: []
>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>> partition managers: []
>>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>> Finished refreshing
>>>> 2015-07-23T12:40:02.692+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>> Refreshing partition manager connections
>>>> 2015-07-23T12:40:02.695+0200 s.k.DynamicBrokersReader [INFO] Read
>>>> partition info from zookeeper:
>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>> 2=kafka3:9092}}
>>>> 2015-07-23T12:40:02.695+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>>>> [Partition{host=kafka2:9092, partition=1}]
>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>> Deleted partition managers: []
>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>> partition managers: []
>>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>> Finished refreshing
>>>> 2015-07-23T12:41:02.696+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>> Refreshing partition manager connections
>>>> 2015-07-23T12:41:02.699+0200 s.k.DynamicBrokersReader [INFO] Read
>>>> partition info from zookeeper:
>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>> 2=kafka3:9092}}
>>>> 2015-07-23T12:41:02.699+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>>>> [Partition{host=kafka2:9092, partition=1}]
>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>> Deleted partition managers: []
>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>> partition managers: []
>>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>> Finished refreshing
>>>> 2015-07-23T12:42:02.735+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>> Refreshing partition manager connections
>>>> 2015-07-23T12:42:02.737+0200 s.k.DynamicBrokersReader [INFO] Read
>>>> partition info from zookeeper:
>>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>>> 2=kafka3:9092}}
>>>> 2015-07-23T12:42:02.737+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>>>> [Partition{host=kafka2:9092, partition=1}]
>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>> Deleted partition managers: []
>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>>> partition managers: []
>>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>>> Finished refreshing
>>>>
>>>>
>>>> and then it becomes frozen. Nothing is written into the nimbus log.
>>>> We've checked the offsets in ZooKeeper and they're not updated:
>>>>
>>>>
>>>> {"topology":{"id":"218e58a5-6bfb-4b32-ae89-f3afa19306e1","name":"our-topology"},"offset":12047144,"partition":1,"broker":{"host":"kafka2","port":9092},"topic":"topic"}
>>>> cZxid = 0x100028958
>>>> ctime = Wed Jul 01 12:22:36 CEST 2015
>>>> mZxid = 0x100518527
>>>> mtime = Thu Jul 23 12:42:41 CEST 2015
>>>> pZxid = 0x100028958
>>>> cversion = 0
>>>> dataVersion = 446913
>>>> aclVersion = 0
>>>> ephemeralOwner = 0x0
>>>> dataLength = 183
>>>> numChildren = 0
>>>>
>>>>
>>>>
>>>> Any ideas of what we could be missing? Should we open a Jira Issue?
>>>>
>>>> Thanks!
>>>>
>>>> --
>>>> Alex Sobrino Beltrán
>>>> Registered Linux User #273657
>>>>
>>>> http://v5tech.es
>>>>
>>>
>>>
>
>
> --
> Regards,
> Abhishek Agarwal
>
>


-- 
Alex Sobrino Beltrán
Registered Linux User #273657

http://v5tech.es

Re: Frozen topology (KafkaSpout + Multilang bolt)

Posted by Abhishek Agarwal <ab...@gmail.com>.
Could this be the issue you guys are facing?
https://issues.apache.org/jira/browse/STORM-1027
FYI, above can happen for non-Kafka sources as well.

On Mon, Sep 21, 2015 at 11:39 PM, Onur Yalazı <on...@8digits.com>
wrote:

> I think we have an issue similar. We are using benstalkd as a message
> source so it's not kafka related in our case.
>
> We have normally 30MB/s traffic between nodes and some logs writing down a
> few durations of topology. Whenever the topology freezes, traffic comes
> down to 200KB/s. and complete latency drops drastically and fail counts
> zeros. We see a fraction of our duration logs and service stops working.
>
> Any ideas?
> On Sep 21, 2015 9:01 PM, "Andrey Yegorov" <an...@gmail.com>
> wrote:
>
>> Hi Alex,
>>
>> Can you share how have you solved/worked around the problem?
>> I hit something similar and I would appreciate any suggestions on how to
>> deal with it.
>> Thank you beforehand.
>>
>> ----------
>> Andrey Yegorov
>>
>> On Thu, Jul 23, 2015 at 4:43 AM, Alex Sobrino <al...@v5tech.es> wrote:
>>
>>> Hi,
>>>
>>> We've got a pretty simple topology running with Storm 0.9.5 (tried also
>>> with 0.9.4 and 0.9.6-INCUBATING) in a 3 machine cluster:
>>>
>>> kafkaSpout (3) -----> processBolt (12)
>>>
>>> Some info:
>>> - kafkaSpout reads from a topic with 3 partitions and 2 replications
>>> - processBolt iterates throught the message and saves the results in
>>> MongoDB
>>> - processBolt is implemented in Python and has a storm.log("I'm doing
>>> something") just to add a simple debug message in the logs
>>> - The messages can be quite big (~25-40 MB) and are in JSON format
>>> - The kafka topic has a retention of 2 hours
>>> - We use the same ZooKeeper cluster to both Kafka and Storm
>>>
>>> The topology gets frozen after several hours (not days) running. We
>>> don't see any message in the logs... In fact, the periodic message from
>>> s.k.KafkaUtils and s.k.ZkCoordinator disapears. As you can imagine, the
>>> message from the Bolt also dissapears. Logs are copy/pasted further on. If
>>> we redeploy the topology everything starts to work again until it becomes
>>> frozen again.
>>>
>>>
>>>
>>> Our kafkaSpout config is:
>>>
>>> ZkHosts zkHosts = new
>>> ZkHosts("zkhost01:2181,zkhost02:2181,zkhost03:2181");
>>> SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "topic",
>>> "/topic/ourclientid", "ourclientid");
>>> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>>> kafkaConfig.fetchSizeBytes = 50*1024*1024;
>>> kafkaConfig.bufferSizeBytes = 50*1024*1024;
>>>
>>> We've also tried setting the following options
>>>
>>> kafkaConfig.forceFromStart = true;
>>> kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); //
>>> Also with kafka.api.OffsetRequest.LatestTime();
>>> kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
>>>
>>> Right now the topology is running without acking the messages since
>>> there's a bug in kafkaSpout with failed messages and deleted offsets in
>>> Kafka.
>>>
>>>
>>>
>>> This is what can be seen in the logs in one of the workers:
>>>
>>> 2015-07-23T12:37:38.008+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>>> name:processBolt I'm doing something
>>> 2015-07-23T12:37:39.079+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>>> name:processBolt I'm doing something
>>> 2015-07-23T12:37:51.013+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>>> name:processBolt I'm doing something
>>> 2015-07-23T12:37:51.091+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>>> name:processBolt I'm doing something
>>> 2015-07-23T12:38:02.684+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>> Refreshing partition manager connections
>>> 2015-07-23T12:38:02.687+0200 s.k.DynamicBrokersReader [INFO] Read
>>> partition info from zookeeper:
>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>> 2=kafka3:9092}}
>>> 2015-07-23T12:38:02.687+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>>> [Partition{host=kafka2, partition=1}]
>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
>>> partition managers: []
>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>> partition managers: []
>>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>> Finished refreshing
>>> 2015-07-23T12:38:09.012+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>>> name:processBolt I'm doing something
>>> 2015-07-23T12:38:41.878+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>>> name:processBolt I'm doing something
>>> 2015-07-23T12:39:02.688+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>> Refreshing partition manager connections
>>> 2015-07-23T12:39:02.691+0200 s.k.DynamicBrokersReader [INFO] Read
>>> partition info from zookeeper:
>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>> 2=kafka3:9092}}
>>> 2015-07-23T12:39:02.691+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>>> [Partition{host=kafka2:9092, partition=1}]
>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
>>> partition managers: []
>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>> partition managers: []
>>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>> Finished refreshing
>>> 2015-07-23T12:40:02.692+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>> Refreshing partition manager connections
>>> 2015-07-23T12:40:02.695+0200 s.k.DynamicBrokersReader [INFO] Read
>>> partition info from zookeeper:
>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>> 2=kafka3:9092}}
>>> 2015-07-23T12:40:02.695+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>>> [Partition{host=kafka2:9092, partition=1}]
>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
>>> partition managers: []
>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>> partition managers: []
>>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>> Finished refreshing
>>> 2015-07-23T12:41:02.696+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>> Refreshing partition manager connections
>>> 2015-07-23T12:41:02.699+0200 s.k.DynamicBrokersReader [INFO] Read
>>> partition info from zookeeper:
>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>> 2=kafka3:9092}}
>>> 2015-07-23T12:41:02.699+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>>> [Partition{host=kafka2:9092, partition=1}]
>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
>>> partition managers: []
>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>> partition managers: []
>>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>> Finished refreshing
>>> 2015-07-23T12:42:02.735+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>> Refreshing partition manager connections
>>> 2015-07-23T12:42:02.737+0200 s.k.DynamicBrokersReader [INFO] Read
>>> partition info from zookeeper:
>>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>>> 2=kafka3:9092}}
>>> 2015-07-23T12:42:02.737+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>>> [Partition{host=kafka2:9092, partition=1}]
>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
>>> partition managers: []
>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>>> partition managers: []
>>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>>> Finished refreshing
>>>
>>>
>>> and then it becomes frozen. Nothing is written into the nimbus log.
>>> We've checked the offsets in ZooKeeper and they're not updated:
>>>
>>>
>>> {"topology":{"id":"218e58a5-6bfb-4b32-ae89-f3afa19306e1","name":"our-topology"},"offset":12047144,"partition":1,"broker":{"host":"kafka2","port":9092},"topic":"topic"}
>>> cZxid = 0x100028958
>>> ctime = Wed Jul 01 12:22:36 CEST 2015
>>> mZxid = 0x100518527
>>> mtime = Thu Jul 23 12:42:41 CEST 2015
>>> pZxid = 0x100028958
>>> cversion = 0
>>> dataVersion = 446913
>>> aclVersion = 0
>>> ephemeralOwner = 0x0
>>> dataLength = 183
>>> numChildren = 0
>>>
>>>
>>>
>>> Any ideas of what we could be missing? Should we open a Jira Issue?
>>>
>>> Thanks!
>>>
>>> --
>>> Alex Sobrino Beltrán
>>> Registered Linux User #273657
>>>
>>> http://v5tech.es
>>>
>>
>>


-- 
Regards,
Abhishek Agarwal

Re: Frozen topology (KafkaSpout + Multilang bolt)

Posted by Onur Yalazı <on...@8digits.com>.
I think we have an issue similar. We are using benstalkd as a message
source so it's not kafka related in our case.

We have normally 30MB/s traffic between nodes and some logs writing down a
few durations of topology. Whenever the topology freezes, traffic comes
down to 200KB/s. and complete latency drops drastically and fail counts
zeros. We see a fraction of our duration logs and service stops working.

Any ideas?
On Sep 21, 2015 9:01 PM, "Andrey Yegorov" <an...@gmail.com> wrote:

> Hi Alex,
>
> Can you share how have you solved/worked around the problem?
> I hit something similar and I would appreciate any suggestions on how to
> deal with it.
> Thank you beforehand.
>
> ----------
> Andrey Yegorov
>
> On Thu, Jul 23, 2015 at 4:43 AM, Alex Sobrino <al...@v5tech.es> wrote:
>
>> Hi,
>>
>> We've got a pretty simple topology running with Storm 0.9.5 (tried also
>> with 0.9.4 and 0.9.6-INCUBATING) in a 3 machine cluster:
>>
>> kafkaSpout (3) -----> processBolt (12)
>>
>> Some info:
>> - kafkaSpout reads from a topic with 3 partitions and 2 replications
>> - processBolt iterates throught the message and saves the results in
>> MongoDB
>> - processBolt is implemented in Python and has a storm.log("I'm doing
>> something") just to add a simple debug message in the logs
>> - The messages can be quite big (~25-40 MB) and are in JSON format
>> - The kafka topic has a retention of 2 hours
>> - We use the same ZooKeeper cluster to both Kafka and Storm
>>
>> The topology gets frozen after several hours (not days) running. We don't
>> see any message in the logs... In fact, the periodic message from
>> s.k.KafkaUtils and s.k.ZkCoordinator disapears. As you can imagine, the
>> message from the Bolt also dissapears. Logs are copy/pasted further on. If
>> we redeploy the topology everything starts to work again until it becomes
>> frozen again.
>>
>>
>>
>> Our kafkaSpout config is:
>>
>> ZkHosts zkHosts = new
>> ZkHosts("zkhost01:2181,zkhost02:2181,zkhost03:2181");
>> SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "topic",
>> "/topic/ourclientid", "ourclientid");
>> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>> kafkaConfig.fetchSizeBytes = 50*1024*1024;
>> kafkaConfig.bufferSizeBytes = 50*1024*1024;
>>
>> We've also tried setting the following options
>>
>> kafkaConfig.forceFromStart = true;
>> kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); //
>> Also with kafka.api.OffsetRequest.LatestTime();
>> kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
>>
>> Right now the topology is running without acking the messages since
>> there's a bug in kafkaSpout with failed messages and deleted offsets in
>> Kafka.
>>
>>
>>
>> This is what can be seen in the logs in one of the workers:
>>
>> 2015-07-23T12:37:38.008+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>> name:processBolt I'm doing something
>> 2015-07-23T12:37:39.079+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>> name:processBolt I'm doing something
>> 2015-07-23T12:37:51.013+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>> name:processBolt I'm doing something
>> 2015-07-23T12:37:51.091+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>> name:processBolt I'm doing something
>> 2015-07-23T12:38:02.684+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>> Refreshing partition manager connections
>> 2015-07-23T12:38:02.687+0200 s.k.DynamicBrokersReader [INFO] Read
>> partition info from zookeeper:
>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>> 2=kafka3:9092}}
>> 2015-07-23T12:38:02.687+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>> [Partition{host=kafka2, partition=1}]
>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
>> partition managers: []
>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>> partition managers: []
>> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished
>> refreshing
>> 2015-07-23T12:38:09.012+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>> name:processBolt I'm doing something
>> 2015-07-23T12:38:41.878+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364,
>> name:processBolt I'm doing something
>> 2015-07-23T12:39:02.688+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>> Refreshing partition manager connections
>> 2015-07-23T12:39:02.691+0200 s.k.DynamicBrokersReader [INFO] Read
>> partition info from zookeeper:
>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>> 2=kafka3:9092}}
>> 2015-07-23T12:39:02.691+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>> [Partition{host=kafka2:9092, partition=1}]
>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
>> partition managers: []
>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>> partition managers: []
>> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished
>> refreshing
>> 2015-07-23T12:40:02.692+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>> Refreshing partition manager connections
>> 2015-07-23T12:40:02.695+0200 s.k.DynamicBrokersReader [INFO] Read
>> partition info from zookeeper:
>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>> 2=kafka3:9092}}
>> 2015-07-23T12:40:02.695+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>> [Partition{host=kafka2:9092, partition=1}]
>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
>> partition managers: []
>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>> partition managers: []
>> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished
>> refreshing
>> 2015-07-23T12:41:02.696+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>> Refreshing partition manager connections
>> 2015-07-23T12:41:02.699+0200 s.k.DynamicBrokersReader [INFO] Read
>> partition info from zookeeper:
>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>> 2=kafka3:9092}}
>> 2015-07-23T12:41:02.699+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>> [Partition{host=kafka2:9092, partition=1}]
>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
>> partition managers: []
>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>> partition managers: []
>> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished
>> refreshing
>> 2015-07-23T12:42:02.735+0200 s.k.ZkCoordinator [INFO] Task [2/3]
>> Refreshing partition manager connections
>> 2015-07-23T12:42:02.737+0200 s.k.DynamicBrokersReader [INFO] Read
>> partition info from zookeeper:
>> GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092,
>> 2=kafka3:9092}}
>> 2015-07-23T12:42:02.737+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned
>> [Partition{host=kafka2:9092, partition=1}]
>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted
>> partition managers: []
>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] New
>> partition managers: []
>> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished
>> refreshing
>>
>>
>> and then it becomes frozen. Nothing is written into the nimbus log. We've
>> checked the offsets in ZooKeeper and they're not updated:
>>
>>
>> {"topology":{"id":"218e58a5-6bfb-4b32-ae89-f3afa19306e1","name":"our-topology"},"offset":12047144,"partition":1,"broker":{"host":"kafka2","port":9092},"topic":"topic"}
>> cZxid = 0x100028958
>> ctime = Wed Jul 01 12:22:36 CEST 2015
>> mZxid = 0x100518527
>> mtime = Thu Jul 23 12:42:41 CEST 2015
>> pZxid = 0x100028958
>> cversion = 0
>> dataVersion = 446913
>> aclVersion = 0
>> ephemeralOwner = 0x0
>> dataLength = 183
>> numChildren = 0
>>
>>
>>
>> Any ideas of what we could be missing? Should we open a Jira Issue?
>>
>> Thanks!
>>
>> --
>> Alex Sobrino Beltrán
>> Registered Linux User #273657
>>
>> http://v5tech.es
>>
>
>