You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Stig Rohde Døssing (JIRA)" <ji...@apache.org> on 2017/08/03 18:35:00 UTC

[jira] [Commented] (STORM-2675) KafkaTridentSpoutOpaque not committing offsets to Kafka

    [ https://issues.apache.org/jira/browse/STORM-2675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113255#comment-16113255 ] 

Stig Rohde Døssing commented on STORM-2675:
-------------------------------------------

Hi.

Sorry about the following wall of text, I wrote some notes while debugging this, and I thought it might be helpful to understand how Trident works.

As I understand it Trident spouts don't save progress to Kafka. The progress should be saved to Storm's Zookeeper. The OpaquePartitionedTridentKafkaSpout should be getting passed the previous batch's ending offset as part of the metadata passed to emitPartitionBatch.

I'm not very experienced with how Trident works internally, but from what I can tell, the spout will be asked to emit a batch with a given transaction id via emitPartitionBatch, as well as some metadata for the last batch. The return value is some metadata that can be used to construct the following batch later. In Kafka's case the metadata is the starting and end offsets for the batch. The metadata is saved to Zookeeper once state has been committed for the batch. When a batch succeeds, the metadata for earlier batches are removed from Zookeeper. If the spout worker is restarted, the last committed metadata is read back from Zookeeper, which should determine which offset the spout restarts at.

https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java is the code that wraps around the OpaquePartitionedTridentKafkaSpout. The writes to Zookeeper should happen via the RotatingTransactionalState object here https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L116. If the spout has just restarted and there is no in-memory metadata for the last batch, the last committed metadata is read from Zookeeper here https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L140.

Here is the metadata returned by emitPartitionBatch from the kafka spout https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L130
It should contain the first and last offsets for the current batch.

The metadata path in Zookeeper is constructed from these two classes https://github.com/apache/storm/blob/64e29f365c9b5d3e15b33f33ab64e200345333e4/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java#L63 and https://github.com/apache/storm/blob/a4afacd9617d620f50cf026fc599821f7ac25c79/storm-client/src/jvm/org/apache/storm/trident/topology/state/RotatingTransactionalState.java. OpaquePartitionedTridentSpoutExecutor will create instances of each of these, based on the partition ids returned by the spout emitter. For example, for me the Zookeeper path for metadata for the Trident spout in examples/storm-kafka-client-examples TridentKafkaClientWordCountNamedTopics was /transactional/spout1/user/test-trident@0 and /transactional/spout1/user/test-trident-1@0. The path format for the kafka spout is /$transactional.zookeeper.root/$spoutComponentName/user/$topicName@partitionNumber. 

When I ran the TridentKafkaWordCountNamedTopics example on master, shut it down and restarted it, I think I got the behavior you described. Here is the Zookeeper metadata after I shut down the example topology the first time:
{code}
get /transactional/spout1/user/test-trident@0/6204
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutBatchMetadata@507e1faf{topicPartition=test-trident-0, firstOffset=1412, lastOffset=1412}
{code}
After restart
{code}
get /transactional/spout1/user/test-trident@0/6271
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutBatchMetadata@50982f4e{topicPartition=test-trident-0, firstOffset=71, lastOffset=71}
{code}
I tried logging the RotatingTransactionalState on creation.
{code}
2017-08-03 17:44:37.767 o.a.s.t.t.s.RotatingTransactionalState Thread-26-spout-spout1-executor[10, 10] [DEBUG] Created RotatingTransactionalState{_state=org.apache.storm.trident.topology.state.TransactionalState@29823124, _subdir='test-trident@0', _curr={6204=null}} org.apache.storm.trident.topology.state.RotatingTransactionalState.<init>(RotatingTransactionalState.java:47)
{code}
The txid is correct, but the associated metadata object is null, which causes the spout to restart.

I looked into it a bit more, and it turns out the TransactionalState code assumes the metadata object can be round trip serialized/deserialized with the json-simple library. When json-simple can't figure out how to json serialize an object, it'll instead quietly return the object's toString (IMO it should be throwing an exception instead of quietly doing the wrong thing). That's what ends up being saved to Zookeeper for the Kafka spout, and our metadata object's toString doesn't return json. When TransactionalState tries to load the metadata back in from Zookeeper, it (apparently deliberately) uses a method from json-simple that quietly returns null if json parsing fails. The result is that we end up both quietly failing to write the metadata properly to Zookeeper, and quietly fail to read the incorrectly serialized data back. The spout ends up restarting every time the executor is rebooted, because it can't read the data in Zookeeper. 

> KafkaTridentSpoutOpaque not committing offsets to Kafka
> -------------------------------------------------------
>
>                 Key: STORM-2675
>                 URL: https://issues.apache.org/jira/browse/STORM-2675
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.1.0
>            Reporter: Preet Puri
>
> Every time I restart the topology the spout was picking the earliest message even though poll strategy is set UNCOMMITTED_EARLIEST.  I looked at Kafka's  __consumer_offsets topic to see if spout (consumer) is committing the offsets but did not find any commits. I am not even able to locate the code in the KafkaTridentSpoutEmitter class where we are updating the commits?
>     conf.put(Config.TOPOLOGY_DEBUG, true);
>     conf.put(Config.TOPOLOGY_WORKERS, 1);
>     conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 4); //tried with1 as well
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, "/aggregate");
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList(new String[]{"localhost"}));
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181);
>  protected static KafkaSpoutConfig<String, String> getPMStatKafkaSpoutConfig() {
>     ByTopicRecordTranslator<String, String> byTopic =
>         new ByTopicRecordTranslator<>((r) -> new Values(r.topic(), r.key(), r.value()),
>             new Fields(TOPIC, PARTITION_KEY, PAYLOAD), SENSOR_STREAM);
>     return new KafkaSpoutConfig.Builder<String, String>(Utils.getBrokerHosts(),
>         StringDeserializer.class, null, Utils.getKafkaEnrichedPMSTopicName())
>             .setMaxPartitionFectchBytes(10 * 1024) // 10 KB
>             .setRetry(getRetryService())
>             .setOffsetCommitPeriodMs(10_000)
>             .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>             .setMaxUncommittedOffsets(250)
>             .setProp("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
>             .setProp("schema.registry.url","http://localhost:8081")
>             .setProp("specific.avro.reader",true)
>             .setGroupId(AGGREGATION_CONSUMER_GROUP)
>             .setRecordTranslator(byTopic).build();
>   }
> Stream pmStatStream =
>         topology.newStream("statStream", new KafkaTridentSpoutOpaque<>(getPMStatKafkaSpoutConfig())).parallelismHint(1)
> storm-version - 1.1.0



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)