You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Jungtaek Lim (JIRA)" <ji...@apache.org> on 2017/09/13 08:05:00 UTC

[jira] [Resolved] (STORM-2675) KafkaTridentSpoutOpaque not committing offsets to Kafka

     [ https://issues.apache.org/jira/browse/STORM-2675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jungtaek Lim resolved STORM-2675.
---------------------------------
       Resolution: Fixed
    Fix Version/s: 1.2.0

Also merged into 1.x branch.

> KafkaTridentSpoutOpaque not committing offsets to Kafka
> -------------------------------------------------------
>
>                 Key: STORM-2675
>                 URL: https://issues.apache.org/jira/browse/STORM-2675
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.1.0
>            Reporter: Preet Puri
>            Assignee: Stig Rohde Døssing
>              Labels: pull-request-available
>             Fix For: 2.0.0, 1.2.0
>
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Every time I restart the topology the spout was picking the earliest message even though poll strategy is set UNCOMMITTED_EARLIEST.  I looked at Kafka's  __consumer_offsets topic to see if spout (consumer) is committing the offsets but did not find any commits. I am not even able to locate the code in the KafkaTridentSpoutEmitter class where we are updating the commits?
>     conf.put(Config.TOPOLOGY_DEBUG, true);
>     conf.put(Config.TOPOLOGY_WORKERS, 1);
>     conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 4); //tried with1 as well
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, "/aggregate");
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList(new String[]{"localhost"}));
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181);
>  protected static KafkaSpoutConfig<String, String> getPMStatKafkaSpoutConfig() {
>     ByTopicRecordTranslator<String, String> byTopic =
>         new ByTopicRecordTranslator<>((r) -> new Values(r.topic(), r.key(), r.value()),
>             new Fields(TOPIC, PARTITION_KEY, PAYLOAD), SENSOR_STREAM);
>     return new KafkaSpoutConfig.Builder<String, String>(Utils.getBrokerHosts(),
>         StringDeserializer.class, null, Utils.getKafkaEnrichedPMSTopicName())
>             .setMaxPartitionFectchBytes(10 * 1024) // 10 KB
>             .setRetry(getRetryService())
>             .setOffsetCommitPeriodMs(10_000)
>             .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>             .setMaxUncommittedOffsets(250)
>             .setProp("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
>             .setProp("schema.registry.url","http://localhost:8081")
>             .setProp("specific.avro.reader",true)
>             .setGroupId(AGGREGATION_CONSUMER_GROUP)
>             .setRecordTranslator(byTopic).build();
>   }
> Stream pmStatStream =
>         topology.newStream("statStream", new KafkaTridentSpoutOpaque<>(getPMStatKafkaSpoutConfig())).parallelismHint(1)
> storm-version - 1.1.0



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)