You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Stig Rohde Døssing (JIRA)" <ji...@apache.org> on 2017/09/10 11:11:02 UTC
[jira] [Updated] (STORM-2675) KafkaTridentSpoutOpaque not
committing offsets to Kafka
[ https://issues.apache.org/jira/browse/STORM-2675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stig Rohde Døssing updated STORM-2675:
--------------------------------------
Fix Version/s: 2.0.0
> KafkaTridentSpoutOpaque not committing offsets to Kafka
> -------------------------------------------------------
>
> Key: STORM-2675
> URL: https://issues.apache.org/jira/browse/STORM-2675
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Affects Versions: 1.1.0
> Reporter: Preet Puri
> Assignee: Stig Rohde Døssing
> Fix For: 2.0.0
>
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> Every time I restart the topology the spout was picking the earliest message even though poll strategy is set UNCOMMITTED_EARLIEST. I looked at Kafka's __consumer_offsets topic to see if spout (consumer) is committing the offsets but did not find any commits. I am not even able to locate the code in the KafkaTridentSpoutEmitter class where we are updating the commits?
> conf.put(Config.TOPOLOGY_DEBUG, true);
> conf.put(Config.TOPOLOGY_WORKERS, 1);
> conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 4); //tried with1 as well
> conf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, "/aggregate");
> conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList(new String[]{"localhost"}));
> conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181);
> protected static KafkaSpoutConfig<String, String> getPMStatKafkaSpoutConfig() {
> ByTopicRecordTranslator<String, String> byTopic =
> new ByTopicRecordTranslator<>((r) -> new Values(r.topic(), r.key(), r.value()),
> new Fields(TOPIC, PARTITION_KEY, PAYLOAD), SENSOR_STREAM);
> return new KafkaSpoutConfig.Builder<String, String>(Utils.getBrokerHosts(),
> StringDeserializer.class, null, Utils.getKafkaEnrichedPMSTopicName())
> .setMaxPartitionFectchBytes(10 * 1024) // 10 KB
> .setRetry(getRetryService())
> .setOffsetCommitPeriodMs(10_000)
> .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
> .setMaxUncommittedOffsets(250)
> .setProp("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
> .setProp("schema.registry.url","http://localhost:8081")
> .setProp("specific.avro.reader",true)
> .setGroupId(AGGREGATION_CONSUMER_GROUP)
> .setRecordTranslator(byTopic).build();
> }
> Stream pmStatStream =
> topology.newStream("statStream", new KafkaTridentSpoutOpaque<>(getPMStatKafkaSpoutConfig())).parallelismHint(1)
> storm-version - 1.1.0
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)