You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "Kirill Prasalov (JIRA)" <ji...@apache.org> on 2016/03/05 03:43:40 UTC

[jira] [Commented] (STORM-963) Frozen topology (KafkaSpout + Multilang bolt)

    [ https://issues.apache.org/jira/browse/STORM-963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15181463#comment-15181463 ] 

Kirill Prasalov commented on STORM-963:
---------------------------------------

Hi! I have the same.

Storm v. 0. 9. 4. Pure Java. A simplistic Trident topology reading from Kafka. SINGLE NODE (i. e. only one worker and parallelism of one - this is important).

The topology consists just of an OpaqueTridentKafkaSpout and a standard Debug filter. They are translated into standard Storm spout and three bolts: $mastercoord-bg0, $spoutcoord-spout0, spout0, b-0. The topology is doing nothing no matter what I send to the Kafka topic. Silence in the logs. The sun is shining. Birds are singing.

This very same issue appeared from time to time. Some time ago I deployed a new topology and soon realized it was processing just a single batch every minute and between those batches it was idle. Then I created the simplistic one with a single Debug filter but with two workers in a three node cluster (Nimbus + two Supervisors). The behavior was reproduced. What I observed was that those batches with the one minute delays between them were processed on a different node than the one where my spout was deployed.

Then I simplified the configuration down to a single worker and voila! The topology became idle and even redeploying it didn't help. So I hacked Trident code a little bit by adding log output to several classes and found the following: $mastercoord-bg0 works just fine and so does $spoutcoord-spout0 but then the tuples get lost on the way to spout0. The only tuples spout0 is able to see are ticks but Kafka broker info emitted from $spoutcoord-spout0 never reaches its destination. After 60 secs which are Storm's sync timeout $mastercoord-bg0 fails its transaction attempt.

My hand made log output from Trident classes looks like this:

2016-03-05T03:32:39.695+0100 storm.trident.topology.MasterBatchCoordinator [DEBUG] Emitted: 14813273:110
2016-03-05T03:32:39.695+0100 storm.trident.spout.TridentSpoutCoordinator [DEBUG] Got a transaction: 14813273:110
2016-03-05T03:32:39.697+0100 storm.trident.spout.TridentSpoutCoordinator [DEBUG] The transaction 14813273 triggers a new batch with metadata GlobalPartitionInformation{partitionMap={0=test-kafka3:9092}}
2016-03-05T03:32:39.934+0100 storm.trident.topology.TridentBoltExecutor [DEBUG] Got a tuple: source: __system:-1, stream: __tick, id: {}, [5]
2016-03-05T03:32:44.935+0100 storm.trident.topology.TridentBoltExecutor [DEBUG] Got a tuple: source: __system:-1, stream: __tick, id: {}, [5]
2016-03-05T03:32:49.935+0100 storm.trident.topology.TridentBoltExecutor [DEBUG] Got a tuple: source: __system:-1, stream: __tick, id: {}, [5]
2016-03-05T03:32:54.935+0100 storm.trident.topology.TridentBoltExecutor [DEBUG] Got a tuple: source: __system:-1, stream: __tick, id: {}, [5]
2016-03-05T03:32:59.935+0100 storm.trident.topology.TridentBoltExecutor [DEBUG] Got a tuple: source: __system:-1, stream: __tick, id: {}, [5]
2016-03-05T03:33:04.936+0100 storm.trident.topology.TridentBoltExecutor [DEBUG] Got a tuple: source: __system:-1, stream: __tick, id: {}, [5]
2016-03-05T03:33:09.935+0100 storm.trident.topology.TridentBoltExecutor [DEBUG] Got a tuple: source: __system:-1, stream: __tick, id: {}, [5]
2016-03-05T03:33:14.936+0100 storm.trident.topology.TridentBoltExecutor [DEBUG] Got a tuple: source: __system:-1, stream: __tick, id: {}, [5]
2016-03-05T03:33:19.936+0100 storm.trident.topology.TridentBoltExecutor [DEBUG] Got a tuple: source: __system:-1, stream: __tick, id: {}, [5]
2016-03-05T03:33:24.936+0100 storm.trident.topology.TridentBoltExecutor [DEBUG] Got a tuple: source: __system:-1, stream: __tick, id: {}, [5]
2016-03-05T03:33:29.936+0100 storm.trident.topology.TridentBoltExecutor [DEBUG] Got a tuple: source: __system:-1, stream: __tick, id: {}, [5]
2016-03-05T03:33:34.937+0100 storm.trident.topology.TridentBoltExecutor [DEBUG] Got a tuple: source: __system:-1, stream: __tick, id: {}, [5]
2016-03-05T03:33:39.693+0100 storm.trident.topology.MasterBatchCoordinator [DEBUG] Failed the transaction 14813273
2016-03-05T03:33:39.696+0100 storm.trident.topology.MasterBatchCoordinator [DEBUG] Emitted: 14813273:111
2016-03-05T03:33:39.696+0100 storm.trident.spout.TridentSpoutCoordinator [DEBUG] Got a transaction: 14813273:111
2016-03-05T03:33:39.700+0100 storm.trident.spout.TridentSpoutCoordinator [DEBUG] The transaction 14813273 triggers a new batch with metadata GlobalPartitionInformation{partitionMap={0=test-kafka3:9092}}
.....................

Also I noticed that every time I submit the topology to the cluster, I see the following in its worker-670*.log:

2016-03-05T01:41:49.057+0100 b.s.m.n.Client [ERROR] connection to Netty-Client-localhost/127.0.0.1:6706 is unavailable
2016-03-05T01:41:49.057+0100 b.s.m.n.Client [ERROR] dropping 1 message(s) destined for Netty-Client-localhost/127.0.0.1:6706

I don't know if it's relevant or not...

Attaching also jstack's output (jstack-bopcat.txt).

The topology is still frozen btw after several days and many redeploys.


> Frozen topology (KafkaSpout + Multilang bolt)
> ---------------------------------------------
>
>                 Key: STORM-963
>                 URL: https://issues.apache.org/jira/browse/STORM-963
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka, storm-multilang
>    Affects Versions: 0.9.4, 0.9.5, 0.9.6
>         Environment: - VMware ESX 5.5 
> - Ubuntu Server 14.04 LTS (kernel 3.16.0-41-generic)
> - Java (TM) SE Runtime Environment (build 1.8.0_45-b14)
> - Python 2.7.6 (default, Jun 22 2015, 17:58:13)
> - Zookeeper 3.4.6
>            Reporter: Alex Sobrino
>              Labels: multilang
>         Attachments: dump, jstack-bopcat.txt, jstack.txt
>
>
> Hi,
> We've got a pretty simple topology running with Storm 0.9.5 (tried also with 0.9.4 and 0.9.6-INCUBATING) in a 3 machine cluster:
> {code}kafkaSpout (3) -----> processBolt (12){code}
> Some info:
> - kafkaSpout reads from a topic with 3 partitions and 2 replications
> - processBolt iterates throught the message and saves the results in MongoDB
> - processBolt is implemented in Python and has a storm.log("I'm doing something") just to add a simple debug message in the logs
> - The messages can be quite big (~25-40 MB) and are in JSON format
> - The kafka topic has a retention of 2 hours
> - We use the same ZooKeeper cluster to both Kafka and Storm
> The topology gets frozen after several hours (not days) running. We don't see any message in the logs... In fact, the periodic message from s.k.KafkaUtils and s.k.ZkCoordinator disapears. As you can imagine, the message from the Bolt also dissapears. Logs are copy/pasted further on. If we redeploy the topology everything starts to work again until it becomes frozen again.
> Our kafkaSpout config is:
> {code}
> ZkHosts zkHosts = new ZkHosts("zkhost01:2181,zkhost02:2181,zkhost03:2181");
> SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "topic", "/topic/ourclientid", "ourclientid");
> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> kafkaConfig.fetchSizeBytes = 50*1024*1024;
> kafkaConfig.bufferSizeBytes = 50*1024*1024;
> {code}
> We've also tried setting the following options
> {code}
> kafkaConfig.forceFromStart = true;
> kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); // Also with kafka.api.OffsetRequest.LatestTime();
> kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
> {code}
> Right now the topology is running without acking the messages since there's a bug in kafkaSpout with failed messages and deleted offsets in Kafka.
> This is what can be seen in the logs in one of the workers:
> {code}
> 2015-07-23T12:37:38.008+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, name:processBolt I'm doing something
> 2015-07-23T12:37:39.079+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, name:processBolt I'm doing something
> 2015-07-23T12:37:51.013+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, name:processBolt I'm doing something
> 2015-07-23T12:37:51.091+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, name:processBolt I'm doing something
> 2015-07-23T12:38:02.684+0200 s.k.ZkCoordinator [INFO] Task [2/3] Refreshing partition manager connections
> 2015-07-23T12:38:02.687+0200 s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, 2=kafka3:9092}}
> 2015-07-23T12:38:02.687+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned [Partition{host=kafka2, partition=1}]
> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted partition managers: []
> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] New partition managers: []
> 2015-07-23T12:38:02.687+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished refreshing
> 2015-07-23T12:38:09.012+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, name:processBolt I'm doing something
> 2015-07-23T12:38:41.878+0200 b.s.t.ShellBolt [INFO] ShellLog pid:28364, name:processBolt I'm doing something
> 2015-07-23T12:39:02.688+0200 s.k.ZkCoordinator [INFO] Task [2/3] Refreshing partition manager connections
> 2015-07-23T12:39:02.691+0200 s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, 2=kafka3:9092}}
> 2015-07-23T12:39:02.691+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned [Partition{host=kafka2:9092, partition=1}]
> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted partition managers: []
> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] New partition managers: []
> 2015-07-23T12:39:02.691+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished refreshing
> 2015-07-23T12:40:02.692+0200 s.k.ZkCoordinator [INFO] Task [2/3] Refreshing partition manager connections
> 2015-07-23T12:40:02.695+0200 s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, 2=kafka3:9092}}
> 2015-07-23T12:40:02.695+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned [Partition{host=kafka2:9092, partition=1}]
> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted partition managers: []
> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] New partition managers: []
> 2015-07-23T12:40:02.695+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished refreshing
> 2015-07-23T12:41:02.696+0200 s.k.ZkCoordinator [INFO] Task [2/3] Refreshing partition manager connections
> 2015-07-23T12:41:02.699+0200 s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, 2=kafka3:9092}}
> 2015-07-23T12:41:02.699+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned [Partition{host=kafka2:9092, partition=1}]
> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted partition managers: []
> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] New partition managers: []
> 2015-07-23T12:41:02.699+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished refreshing
> 2015-07-23T12:42:02.735+0200 s.k.ZkCoordinator [INFO] Task [2/3] Refreshing partition manager connections
> 2015-07-23T12:42:02.737+0200 s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=kafka1:9092, 1=kafka2:9092, 2=kafka3:9092}}
> 2015-07-23T12:42:02.737+0200 s.k.KafkaUtils [INFO] Task [2/3] assigned [Partition{host=kafka2:9092, partition=1}]
> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] Deleted partition managers: []
> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] New partition managers: []
> 2015-07-23T12:42:02.737+0200 s.k.ZkCoordinator [INFO] Task [2/3] Finished refreshing
> {code}
> and then it becomes frozen. Nothing is written into the nimbus log. We've checked the offsets in ZooKeeper and they're not updated:
> {code}
> {"topology":{"id":"218e58a5-6bfb-4b32-ae89-f3afa19306e1","name":"our-topology"},"offset":12047144,"partition":1,"broker":{"host":"kafka2","port":9092},"topic":"topic"}
> cZxid = 0x100028958
> ctime = Wed Jul 01 12:22:36 CEST 2015
> mZxid = 0x100518527
> mtime = Thu Jul 23 12:42:41 CEST 2015
> pZxid = 0x100028958
> cversion = 0
> dataVersion = 446913
> aclVersion = 0
> ephemeralOwner = 0x0
> dataLength = 183
> numChildren = 0
> {code}
> Any ideas of what we could be missing?
> PS: This was sent to the Storm user's mailing list and got 0 replies :\



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)