You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Stig Rohde Døssing (JIRA)" <ji...@apache.org> on 2017/08/03 19:56:03 UTC

[jira] [Comment Edited] (STORM-2675) KafkaTridentSpoutOpaque not committing offsets to Kafka

    [ https://issues.apache.org/jira/browse/STORM-2675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113379#comment-16113379 ] 

Stig Rohde Døssing edited comment on STORM-2675 at 8/3/17 7:55 PM:
-------------------------------------------------------------------

I think we should change the metadata object so it returns a Map to Trident instead of the current KafkaTridentSpoutBatchMetadata. json-simple knows how to handle Map. I'll open a PR soon assuming it works. It might be nice in the future to replace json-simple with something more flexible, like Jackson so spout implementations can use whatever object type they want, but for now I'd rather just fix this.


was (Author: srdo):
I think we should change the metadata object so it returns a Map to Trident instead of the current KafkaTridentSpoutBatchMetadata. json-simple knows how to handle Map. I'll open a PR soon assuming it works.

> KafkaTridentSpoutOpaque not committing offsets to Kafka
> -------------------------------------------------------
>
>                 Key: STORM-2675
>                 URL: https://issues.apache.org/jira/browse/STORM-2675
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.1.0
>            Reporter: Preet Puri
>            Assignee: Stig Rohde Døssing
>
> Every time I restart the topology the spout was picking the earliest message even though poll strategy is set UNCOMMITTED_EARLIEST.  I looked at Kafka's  __consumer_offsets topic to see if spout (consumer) is committing the offsets but did not find any commits. I am not even able to locate the code in the KafkaTridentSpoutEmitter class where we are updating the commits?
>     conf.put(Config.TOPOLOGY_DEBUG, true);
>     conf.put(Config.TOPOLOGY_WORKERS, 1);
>     conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 4); //tried with1 as well
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, "/aggregate");
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList(new String[]{"localhost"}));
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181);
>  protected static KafkaSpoutConfig<String, String> getPMStatKafkaSpoutConfig() {
>     ByTopicRecordTranslator<String, String> byTopic =
>         new ByTopicRecordTranslator<>((r) -> new Values(r.topic(), r.key(), r.value()),
>             new Fields(TOPIC, PARTITION_KEY, PAYLOAD), SENSOR_STREAM);
>     return new KafkaSpoutConfig.Builder<String, String>(Utils.getBrokerHosts(),
>         StringDeserializer.class, null, Utils.getKafkaEnrichedPMSTopicName())
>             .setMaxPartitionFectchBytes(10 * 1024) // 10 KB
>             .setRetry(getRetryService())
>             .setOffsetCommitPeriodMs(10_000)
>             .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>             .setMaxUncommittedOffsets(250)
>             .setProp("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
>             .setProp("schema.registry.url","http://localhost:8081")
>             .setProp("specific.avro.reader",true)
>             .setGroupId(AGGREGATION_CONSUMER_GROUP)
>             .setRecordTranslator(byTopic).build();
>   }
> Stream pmStatStream =
>         topology.newStream("statStream", new KafkaTridentSpoutOpaque<>(getPMStatKafkaSpoutConfig())).parallelismHint(1)
> storm-version - 1.1.0



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)