You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Janith Kaiprath Valiyalappil (JIRA)" <ji...@apache.org> on 2017/09/04 07:57:00 UTC

[jira] [Commented] (STORM-2675) KafkaTridentSpoutOpaque not committing offsets to Kafka

    [ https://issues.apache.org/jira/browse/STORM-2675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16152228#comment-16152228 ] 

Janith Kaiprath Valiyalappil commented on STORM-2675:
-----------------------------------------------------

I had assumed if 
auto.commit.enable is true, and 
auto.commit.interval.ms is relatively small, and
if we do not force kill the topology, all consumed messages would be automatically committed by the kafka consumer. If this was the case, then when we start with UNCOMMITED_EARLIEST/LATEST we should be continuing from the last committed position. This would be the case only for the first batch, and all later batches would depend on zookeeper.

I am just curious, why it didn't follow the above behavior even with the above bug where first metadata is always null.

> KafkaTridentSpoutOpaque not committing offsets to Kafka
> -------------------------------------------------------
>
>                 Key: STORM-2675
>                 URL: https://issues.apache.org/jira/browse/STORM-2675
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.1.0
>            Reporter: Preet Puri
>            Assignee: Stig Rohde Døssing
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> Every time I restart the topology the spout was picking the earliest message even though poll strategy is set UNCOMMITTED_EARLIEST.  I looked at Kafka's  __consumer_offsets topic to see if spout (consumer) is committing the offsets but did not find any commits. I am not even able to locate the code in the KafkaTridentSpoutEmitter class where we are updating the commits?
>     conf.put(Config.TOPOLOGY_DEBUG, true);
>     conf.put(Config.TOPOLOGY_WORKERS, 1);
>     conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 4); //tried with1 as well
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, "/aggregate");
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList(new String[]{"localhost"}));
>     conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181);
>  protected static KafkaSpoutConfig<String, String> getPMStatKafkaSpoutConfig() {
>     ByTopicRecordTranslator<String, String> byTopic =
>         new ByTopicRecordTranslator<>((r) -> new Values(r.topic(), r.key(), r.value()),
>             new Fields(TOPIC, PARTITION_KEY, PAYLOAD), SENSOR_STREAM);
>     return new KafkaSpoutConfig.Builder<String, String>(Utils.getBrokerHosts(),
>         StringDeserializer.class, null, Utils.getKafkaEnrichedPMSTopicName())
>             .setMaxPartitionFectchBytes(10 * 1024) // 10 KB
>             .setRetry(getRetryService())
>             .setOffsetCommitPeriodMs(10_000)
>             .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>             .setMaxUncommittedOffsets(250)
>             .setProp("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
>             .setProp("schema.registry.url","http://localhost:8081")
>             .setProp("specific.avro.reader",true)
>             .setGroupId(AGGREGATION_CONSUMER_GROUP)
>             .setRecordTranslator(byTopic).build();
>   }
> Stream pmStatStream =
>         topology.newStream("statStream", new KafkaTridentSpoutOpaque<>(getPMStatKafkaSpoutConfig())).parallelismHint(1)
> storm-version - 1.1.0



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)