You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Preet Puri (JIRA)" <ji...@apache.org> on 2017/08/02 18:10:00 UTC

[jira] [Created] (STORM-2675) KafkaTridentSpoutOpaque not committing offsets to Kafka

Preet Puri created STORM-2675:
---------------------------------

             Summary: KafkaTridentSpoutOpaque not committing offsets to Kafka
                 Key: STORM-2675
                 URL: https://issues.apache.org/jira/browse/STORM-2675
             Project: Apache Storm
          Issue Type: Bug
          Components: storm-kafka-client
    Affects Versions: 1.1.0
            Reporter: Preet Puri


Every time I restart the topology the spout was picking the earliest message even though poll strategy is set UNCOMMITTED_EARLIEST.  I looked at Kafka's  __consumer_offsets topic to see it spout (Consumer is committing the offsets but did not find any commits). No even able to locate the code in the KafkaTridentSpoutEmitter class where we are updating the commits?

onf.put(Config.TOPOLOGY_DEBUG, true);
    conf.put(Config.TOPOLOGY_WORKERS, 1);
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 4); //tried with1 as well
    conf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, "/aggregate");
    conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList(new String[]{"localhost"}));
    conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181);

 protected static KafkaSpoutConfig<String, String> getPMStatKafkaSpoutConfig() {
    ByTopicRecordTranslator<String, String> byTopic =
        new ByTopicRecordTranslator<>((r) -> new Values(r.topic(), r.key(), r.value()),
            new Fields(TOPIC, PARTITION_KEY, PAYLOAD), SENSOR_STREAM);

    return new KafkaSpoutConfig.Builder<String, String>(Utils.getBrokerHosts(),
        StringDeserializer.class, null, Utils.getKafkaEnrichedPMSTopicName())
            .setMaxPartitionFectchBytes(10 * 1024) // 10 KB
            .setRetry(getRetryService())
            .setOffsetCommitPeriodMs(10_000)
            .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
            .setMaxUncommittedOffsets(250)
            .setProp("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer")
            .setProp("schema.registry.url","http://localhost:8081")
            .setProp("specific.avro.reader",true)
            .setGroupId(AGGREGATION_CONSUMER_GROUP)
            .setRecordTranslator(byTopic).build();
  }

Stream pmStatStream =
        topology.newStream("statStream", new KafkaTridentSpoutOpaque<>(getPMStatKafkaSpoutConfig())).parallelismHint(1)

storm-version - 1.1.0



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)