You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by janithkv <gi...@git.apache.org> on 2018/11/18 19:09:08 UTC

[GitHub] storm pull request #2911: Add TIMESTAMP option for FirstPollOffset

GitHub user janithkv opened a pull request:

    https://github.com/apache/storm/pull/2911

    Add TIMESTAMP option for FirstPollOffset

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/janithkv/storm STORM-3279_trident_kafka

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2911.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2911
    
----
commit 087122d867da24b3dee1feef5b1a8e1e82786c75
Author: janithkv <ja...@...>
Date:   2018-11-18T19:07:37Z

    Add TIMESTAMP option for FirstPollOffset

----


---

[GitHub] storm pull request #2911: STORM-2720 : Add TIMESTAMP option for FirstPollOff...

Posted by janithkv <gi...@git.apache.org>.
Github user janithkv commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2911#discussion_r234878772
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java ---
    @@ -225,7 +229,23 @@ private void emitTuple(TridentCollector collector, ConsumerRecord<K, V> record)
          */
         private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta) {
             if (isFirstPoll(tp)) {
    -            if (firstPollOffsetStrategy == EARLIEST) {
    +            if(firstPollOffsetStrategy.equals(TIMESTAMP)) {
    +                Long startTimeStampOffset = null;
    +                try {
    +                    startTimeStampOffset =
    +                            consumer.offsetsForTimes(Collections.singletonMap(tp, startTimeStamp)).get(tp).offset();
    +                } catch (IllegalArgumentException e) {
    +                    LOG.error("Illegal timestamp {} provided for tp {} ",startTimeStamp,tp.toString());
    +                } catch (UnsupportedVersionException e) {
    +                    LOG.error("Kafka Server do not support offsetsForTimes(), probably < 0.10.1",e);
    +                }
    +                if(startTimeStampOffset != null) {
    +                    LOG.debug("First poll for topic partition [{}], seeking to partition from startTimeStamp [{}]", tp , startTimeStamp);
    +                    consumer.seek(tp, startTimeStampOffset);
    +                } else {
    +                    LOG.info("Kafka consumer offset reset by timestamp failed for TopicPartition {}, TimeStamp {}, Offset {}. Restart with a different Strategy ", tp, startTimeStamp, startTimeStampOffset);
    --- End diff --
    
    ok will change


---

[GitHub] storm pull request #2911: Add TIMESTAMP option for FirstPollOffset

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2911#discussion_r234541339
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -137,7 +142,11 @@ public KafkaSpoutConfig(Builder<K, V> builder) {
             /**
              * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as LATEST
              */
    -        UNCOMMITTED_LATEST;
    +        UNCOMMITTED_LATEST,
    +        /**
    +         * Start at the earliest offset whose timestamp is greater than or equal to the given startTimestamp
    +         */
    +        TIMESTAMP;
    --- End diff --
    
    I'm wondering if it would be useful to have both TIMESTAMP and UNCOMMITTED_TIMESTAMP?


---

[GitHub] storm pull request #2911: Add TIMESTAMP option for FirstPollOffset

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2911#discussion_r234542042
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java ---
    @@ -225,7 +229,23 @@ private void emitTuple(TridentCollector collector, ConsumerRecord<K, V> record)
          */
         private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta) {
             if (isFirstPoll(tp)) {
    -            if (firstPollOffsetStrategy == EARLIEST) {
    +            if(firstPollOffsetStrategy.equals(TIMESTAMP)) {
    +                Long startTimeStampOffset = null;
    +                try {
    +                    startTimeStampOffset =
    +                            consumer.offsetsForTimes(Collections.singletonMap(tp, startTimeStamp)).get(tp).offset();
    +                } catch (IllegalArgumentException e) {
    +                    LOG.error("Illegal timestamp {} provided for tp {} ",startTimeStamp,tp.toString());
    +                } catch (UnsupportedVersionException e) {
    +                    LOG.error("Kafka Server do not support offsetsForTimes(), probably < 0.10.1",e);
    +                }
    +                if(startTimeStampOffset != null) {
    +                    LOG.debug("First poll for topic partition [{}], seeking to partition from startTimeStamp [{}]", tp , startTimeStamp);
    +                    consumer.seek(tp, startTimeStampOffset);
    +                } else {
    +                    LOG.info("Kafka consumer offset reset by timestamp failed for TopicPartition {}, TimeStamp {}, Offset {}. Restart with a different Strategy ", tp, startTimeStamp, startTimeStampOffset);
    --- End diff --
    
    I would like to crash the worker here.


---

[GitHub] storm pull request #2911: STORM-2720 : Add TIMESTAMP option for FirstPollOff...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2911#discussion_r234901127
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -137,7 +142,11 @@ public KafkaSpoutConfig(Builder<K, V> builder) {
             /**
              * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as LATEST
              */
    -        UNCOMMITTED_LATEST;
    +        UNCOMMITTED_LATEST,
    +        /**
    +         * Start at the earliest offset whose timestamp is greater than or equal to the given startTimestamp
    +         */
    +        TIMESTAMP;
    --- End diff --
    
    Not sure I follow. I mean that I think we should have both TIMESTAMP, which unconditionally seeks to a timestamp, and UNCOMMITTED_TIMESTAMP which only seeks to a timestamp if no offset has been committed yet. That way you can use UNCOMMITTED_TIMESTAMP to start at a timestamp the first time you deploy the topology, but the topology doesn't reset when you redeploy.


---

[GitHub] storm pull request #2911: Add TIMESTAMP option for FirstPollOffset

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2911#discussion_r234541619
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java ---
    @@ -225,7 +229,23 @@ private void emitTuple(TridentCollector collector, ConsumerRecord<K, V> record)
          */
         private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta) {
             if (isFirstPoll(tp)) {
    -            if (firstPollOffsetStrategy == EARLIEST) {
    +            if(firstPollOffsetStrategy.equals(TIMESTAMP)) {
    +                Long startTimeStampOffset = null;
    +                try {
    +                    startTimeStampOffset =
    +                            consumer.offsetsForTimes(Collections.singletonMap(tp, startTimeStamp)).get(tp).offset();
    +                } catch (IllegalArgumentException e) {
    --- End diff --
    
    I would rather not catch this exception or the UnsupportedVersionException. We can't meaningfully continue in this case, so I think we should just let the exception kill the worker.


---

[GitHub] storm pull request #2911: STORM-2720 : Add TIMESTAMP option for FirstPollOff...

Posted by janithkv <gi...@git.apache.org>.
Github user janithkv commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2911#discussion_r234878731
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -137,7 +142,11 @@ public KafkaSpoutConfig(Builder<K, V> builder) {
             /**
              * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as LATEST
              */
    -        UNCOMMITTED_LATEST;
    +        UNCOMMITTED_LATEST,
    +        /**
    +         * Start at the earliest offset whose timestamp is greater than or equal to the given startTimestamp
    +         */
    +        TIMESTAMP;
    --- End diff --
    
    Hmm not sure. I think practically it might make sense to go back a few minutes rather than all the way upto EARLIEST. But that might be a bit out of scope.


---

[GitHub] storm pull request #2911: STORM-2720 : Add TIMESTAMP option for FirstPollOff...

Posted by janithkv <gi...@git.apache.org>.
Github user janithkv commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2911#discussion_r234878746
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java ---
    @@ -225,7 +229,23 @@ private void emitTuple(TridentCollector collector, ConsumerRecord<K, V> record)
          */
         private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta) {
             if (isFirstPoll(tp)) {
    -            if (firstPollOffsetStrategy == EARLIEST) {
    +            if(firstPollOffsetStrategy.equals(TIMESTAMP)) {
    +                Long startTimeStampOffset = null;
    +                try {
    +                    startTimeStampOffset =
    +                            consumer.offsetsForTimes(Collections.singletonMap(tp, startTimeStamp)).get(tp).offset();
    +                } catch (IllegalArgumentException e) {
    --- End diff --
    
    ok will change


---