You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by srdo <gi...@git.apache.org> on 2017/02/05 18:57:28 UTC

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

GitHub user srdo opened a pull request:

    https://github.com/apache/storm/pull/1924

    STORM-2343: New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets tuples fail at once

    This builds on https://github.com/apache/storm/pull/1832, sorry for the mixed diff
    
    This makes the spout emit failed tuples even when numUncommittedOffsets has reached maxUncommittedOffsets. Previously the spout would fail to progress in that case. I haven't really been able to enforce maxUncommittedOffsets strictly without having to request extra messages from Kafka and throwing them away. The consumer doesn't allow the spout to limit how many tuples it requests dynamically, and I'd prefer that the spout doesn't truncate the list of records it receives.
    
    Instead, maxUncommittedOffsets is now a soft cap on the number of tuples that can be emitted before some must be committed. In some cases (e.g. 10 partitions with 1 failed message emitted on each), the spout will exceed the limit. It should never be by more than 1 maxPollRecords size though.
    
    This also makes KafkaSpoutRetryExponentialBackoff use Storm Time, and it fixes a bug where messages could be lost if they were scheduled for retry at the same timestamp. It also fixes double counting failed tuples in numUncommittedOffsets when retrying, since the counter isn't decreased when the tuple is scheduled for retry.
    
    maxPollRecords is now capped by maxUncommittedOffsets, since if maxPollRecords is higher the spout will exceed the limit on any poll where Kafka can return maxPollRecords messages.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/srdo/storm STORM-2343

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1924.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1924
    
----
commit d7432f8e39c2dd91902bb281be6381d8ce4d53fe
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Date:   2017-02-04T08:04:19Z

    STORM-2250: Kafka Spout Refactoring to Increase Modularity and Testability

commit 1c3f9ab53995236cb1a92ba8a51cdfcf73a21bfc
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Date:   2017-02-05T18:46:49Z

    STORM-2343: Fix new kafka spout stopping processing if more than maxUncommittedOffsets tuples fail at once

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99869874
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---
    @@ -54,59 +56,92 @@
         private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
             @Override
             public int compare(RetrySchedule entry1, RetrySchedule entry2) {
    -            return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
    +            int result = Long.valueOf(entry1.nextRetryTimeMs()).compareTo(entry2.nextRetryTimeMs());
    +            
    +            if(result == 0) {
    +                //TreeSet uses compareTo instead of equals() for the Set contract
    +                //Ensure that we can save two retry schedules with the same timestamp
    +                result = entry1.hashCode() - entry2.hashCode();
    +            }
    +            return result;
             }
         }
     
         private class RetrySchedule {
             private final KafkaSpoutMessageId msgId;
    -        private long nextRetryTimeNanos;
    +        private long nextRetryTimeMs;
     
             public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) {
                 this.msgId = msgId;
    -            this.nextRetryTimeNanos = nextRetryTime;
    +            this.nextRetryTimeMs = nextRetryTime;
                 LOG.debug("Created {}", this);
             }
     
             public void setNextRetryTime() {
    -            nextRetryTimeNanos = nextTime(msgId);
    +            nextRetryTimeMs = nextTime(msgId);
                 LOG.debug("Updated {}", this);
             }
     
    -        public boolean retry(long currentTimeNanos) {
    -            return nextRetryTimeNanos <= currentTimeNanos;
    +        public boolean retry(long currentTimeMs) {
    +            return nextRetryTimeMs <= currentTimeMs;
             }
     
             @Override
             public String toString() {
                 return "RetrySchedule{" +
                         "msgId=" + msgId +
    -                    ", nextRetryTime=" + nextRetryTimeNanos +
    +                    ", nextRetryTimeMs=" + nextRetryTimeMs +
                         '}';
             }
     
    +        @Override
    +        public int hashCode() {
    --- End diff --
    
    ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @hmcl Thanks for the explanation. It makes more sense now.
    
    I don't think we can meaningfully handle the case where the user has specified no cap on retries and one or more tuples keep failing.
    
    In the situation you describe, `maxUncommittedOffsets` isn't really solving the problem. If offset 3 fails and all other tuples can be acked, the spout will emit another `maxUncommittedOffsets` tuples on the same partition as 3, and will then stop emitting because it can't commit any tuples until 3 is acked. At this point, only offset 3 will get re-emitted until it hopefully succeeds at some point, so having a continually failing tuple (or just one that fails many times) will still "clog" the spout.
    
    If `maxUncommittedOffsets` is removed, you're right that the `acked` map size is technically uncapped, but given the amount of information we store, and how the spout handles retries, I think it is a non-issue. Just to illustrate, say we remove `maxUncommittedOffsets`, and let's say again that offset 3 is failing a large number of times, and the last acked offset is far beyond it, e.g. 5000000. When 3 is ready for retry, the consumer is reset to that position on the relevant partition. This means it fetches and emits message 3. It will also have to fetch 4...5000000 (potentially over many calls to `poll()`), because we never seek forward to where we left off). The spout therefore has to spend time fetching and discarding all tuples up to 5000000 before it can finally emit another tuple. It seems likely that it'll hit the point where it is spending all the time fetching and discarding acked tuples earlier than it'll run out of memory to store their offsets.
    
    Disregarding the issue with consumer seeking, because my reasoning relies on an implementation detail of how we do retries, I'm still not sure `maxUncommittedOffsets` is allowing higher throughput compared to not having it. If we allow the spout to fail with an `OutOfMemoryError`, the spout will have had higher throughput up to the crash than if it were being throttled by `maxUncommittedOffsets` (because otherwise it would also have had an OOME in that case). It really seems to me like all `maxUncommittedOffsets` is doing is trading having the spout potentially cause an OOME due to `acked` size, in exchange for making the spout react to the same situation by not emitting any more tuples. I'm not sure that is better, because data flow stops in either case. 
    
    `maxUncommittedOffsets` could have some value in a system where the `KafkaSpout` is a secondary stream source and the messages coming out of it aren't time sensitive. In that case it might be fine to let the `KafkaSpout` stop emitting tuples for a while if some tuples temporarily can't be acked, but having it cause an OOME would be too disruptive because the primary spout could be doing fine. I can't come up with a concrete example of this kind of configuration though.
    
    I'd be fine with keeping `maxUncommittedOffsets` for that kind of situation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r106685548
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -268,12 +268,12 @@ private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class
             
             /**
              * The maximum number of records a poll will return.
    -         * Will only work with Kafka 0.10.0 and above.
    +         * This is limited by maxUncommittedOffsets, since it doesn't make sense to allow larger polls than the spout is allowed to emit.
    +         * Please note that when there are retriable tuples on a partition, maxPollRecords is an upper bound for how far the spout will read past the last committed offset on that partition.
    +         * It is recommended that users set maxUncommittedOffsets and maxPollRecords to be equal.
    --- End diff --
    
    @hmcl I agree that it is not ideal, but there's an issue with letting maxUncommittedOffsets be larger than maxPollRecords.
    
    From an earlier response:
    "If maxPollRecords is less than maxUncommittedOffsets, there's a risk of the spout getting stuck on some tuples for a while when it is retrying tuples.
    Say there are 10 retriable tuples following the last committed offset, and maxUncommittedOffsets is 10. If maxPollRecords is 5 and the first 5 retriable tuples are reemitted in the first batch, the next 5 tuples can't be emitted until (some of) the first 5 are acked. This is because the spout will seek the consumer back to the last committed offset any time there are failed tuples, which will lead to it getting the first 5 tuples out of the consumer, checking that they are emitted, and skipping them. This will repeat until the last committed offset moves. If there are other partitions with tuples available, those tuples may get emitted, but the "blocked" partition won't progress until some tuples are acked on it."
    
    How about we fix this by making doSeekRetriableTopicPartitions seek to the lowest retriable offset per partition for the partitions with failed tuples, instead of seeking to the last committed/committable offset? It seems like seeking to the last committed offset is likely to have some bad interactions with maxUncommittedOffsets and maxPollRecords. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r103352827
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -268,12 +268,11 @@ private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class
             
             /**
              * The maximum number of records a poll will return.
    -         * Will only work with Kafka 0.10.0 and above.
    +         * This is limited by maxUncommittedOffsets, since it doesn't make sense to allow larger polls than the spout is allowed to emit
    +         * Please note that maxPollRecords effectively defines how far past the last committed offset the spout will be allowed to read, if there are failed tuples to retry.
    --- End diff --
    
    what do you mean by "if there are failed tuples to retry" ? I am not quite following what you mean in here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99639454
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -329,11 +328,13 @@ private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class
                 this.offsetCommitPeriodMs = offsetCommitPeriodMs;
                 return this;
             }
    -        
    +
             /**
              * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
              * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
    -         * of pending offsets bellow the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
    +         * of pending offsets below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
    +         * Note that this limit can in some cases be exceeded, but no partition will exceed this limit by more than 2*maxPollRecords.
    --- End diff --
    
    Sorry, this is a mistake. The worst case is one where maxUncommittedOffsets - 1 messages have been emitted. In that case, the spout may poll and emit another maxPollRecords messages. So the comment should say maxPollRecords, not 2*maxPollRecords.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99638909
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---
    @@ -54,59 +56,92 @@
         private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
             @Override
             public int compare(RetrySchedule entry1, RetrySchedule entry2) {
    -            return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
    +            int result = Long.valueOf(entry1.nextRetryTimeMs()).compareTo(entry2.nextRetryTimeMs());
    +            
    +            if(result == 0) {
    +                //TreeSet uses compareTo instead of equals() for the Set contract
    +                //Ensure that we can save two retry schedules with the same timestamp
    +                result = entry1.hashCode() - entry2.hashCode();
    +            }
    +            return result;
             }
         }
     
         private class RetrySchedule {
             private final KafkaSpoutMessageId msgId;
    -        private long nextRetryTimeNanos;
    +        private long nextRetryTimeMs;
     
             public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) {
                 this.msgId = msgId;
    -            this.nextRetryTimeNanos = nextRetryTime;
    +            this.nextRetryTimeMs = nextRetryTime;
                 LOG.debug("Created {}", this);
             }
     
             public void setNextRetryTime() {
    -            nextRetryTimeNanos = nextTime(msgId);
    +            nextRetryTimeMs = nextTime(msgId);
                 LOG.debug("Updated {}", this);
             }
     
    -        public boolean retry(long currentTimeNanos) {
    -            return nextRetryTimeNanos <= currentTimeNanos;
    +        public boolean retry(long currentTimeMs) {
    +            return nextRetryTimeMs <= currentTimeMs;
             }
     
             @Override
             public String toString() {
                 return "RetrySchedule{" +
                         "msgId=" + msgId +
    -                    ", nextRetryTime=" + nextRetryTimeNanos +
    +                    ", nextRetryTimeMs=" + nextRetryTimeMs +
                         '}';
             }
     
    +        @Override
    +        public int hashCode() {
    +            int hash = 5;
    +            hash = 29 * hash + Objects.hashCode(this.msgId);
    +            return hash;
    +        }
    +
    +        @Override
    +        public boolean equals(Object obj) {
    +            if (this == obj) {
    +                return true;
    +            }
    +            if (obj == null) {
    +                return false;
    +            }
    +            if (getClass() != obj.getClass()) {
    +                return false;
    +            }
    +            final RetrySchedule other = (RetrySchedule) obj;
    +            if (!Objects.equals(this.msgId, other.msgId)) {
    +                return false;
    +            }
    +            return true;
    +        }
    +
             public KafkaSpoutMessageId msgId() {
                 return msgId;
             }
     
    -        public long nextRetryTimeNanos() {
    -            return nextRetryTimeNanos;
    +        public long nextRetryTimeMs() {
    +            return nextRetryTimeMs;
             }
         }
     
         public static class TimeInterval implements Serializable {
    -        private final long lengthNanos;
    -        private final long length;
    -        private final TimeUnit timeUnit;
    +        private final long lengthMs;
     
             /**
              * @param length length of the time interval in the units specified by {@link TimeUnit}
    -         * @param timeUnit unit used to specify a time interval on which to specify a time unit
    +         * @param timeUnit unit used to specify a time interval on which to specify a time unit. Smallest supported unit is milliseconds
              */
             public TimeInterval(long length, TimeUnit timeUnit) {
    -            this.length = length;
    -            this.timeUnit = timeUnit;
    -            this.lengthNanos = timeUnit.toNanos(length);
    +            
    +            if(timeUnit == TimeUnit.MICROSECONDS || timeUnit == TimeUnit.NANOSECONDS) {
    +                throw new IllegalArgumentException("TimeInterval does not support time units smaller than milliseconds");
    +            }
    --- End diff --
    
    The class doesn't support time units less than milliseconds. I'd prefer to warn about misconfiguration rather than just clamping to 0 or 1 millisecond.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r103353882
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -329,11 +328,13 @@ private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class
                 this.offsetCommitPeriodMs = offsetCommitPeriodMs;
                 return this;
             }
    -        
    +
             /**
              * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
              * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
    -         * of pending offsets bellow the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
    +         * of pending offsets below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
    +         * Note that this limit can in some cases be exceeded, but no partition will exceed this limit by more than maxPollRecords - 1.
    +         * This limits maxPollRecords, since it doesn't make sense to allow larger polls than the spout is allowed to emit
    --- End diff --
    
    This threshold effectively limits maxPollRecords, since


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @hmcl Yes, I believe so. There is still a problem where the spout will stop emitting tuples if more than numUncommittedOffsets tuples have failed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r107537760
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java ---
    @@ -54,10 +55,11 @@
         boolean retainAll(Collection<TopicPartition> topicPartitions);
     
         /**
    -     * @return set of topic partitions that have offsets that are ready to be retried, i.e.,
    -     * for which a tuple has failed and has retry time less than current time
    +     * @return The earliest retriable offset for each TopicPartition that has
    +     * offsets that are ready to be retried, i.e., for which a tuple has failed
    +     * and has retry time less than current time
          */
    -    Set<TopicPartition> retriableTopicPartitions();
    +    Map<TopicPartition, Long> earliestOffsetForEachRetriableTopicPartition();
    --- End diff --
    
    Let's rename this method to `earliestRetriableOffsets`. The javadoc will provide the details. Let's not write very complex and long method names.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99869978
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---
    @@ -54,59 +56,92 @@
         private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
             @Override
             public int compare(RetrySchedule entry1, RetrySchedule entry2) {
    -            return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
    +            int result = Long.valueOf(entry1.nextRetryTimeMs()).compareTo(entry2.nextRetryTimeMs());
    +            
    +            if(result == 0) {
    +                //TreeSet uses compareTo instead of equals() for the Set contract
    +                //Ensure that we can save two retry schedules with the same timestamp
    +                result = entry1.hashCode() - entry2.hashCode();
    +            }
    +            return result;
             }
         }
     
         private class RetrySchedule {
             private final KafkaSpoutMessageId msgId;
    -        private long nextRetryTimeNanos;
    +        private long nextRetryTimeMs;
     
             public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) {
                 this.msgId = msgId;
    -            this.nextRetryTimeNanos = nextRetryTime;
    +            this.nextRetryTimeMs = nextRetryTime;
                 LOG.debug("Created {}", this);
             }
     
             public void setNextRetryTime() {
    -            nextRetryTimeNanos = nextTime(msgId);
    +            nextRetryTimeMs = nextTime(msgId);
                 LOG.debug("Updated {}", this);
             }
     
    -        public boolean retry(long currentTimeNanos) {
    -            return nextRetryTimeNanos <= currentTimeNanos;
    +        public boolean retry(long currentTimeMs) {
    +            return nextRetryTimeMs <= currentTimeMs;
             }
     
             @Override
             public String toString() {
                 return "RetrySchedule{" +
                         "msgId=" + msgId +
    -                    ", nextRetryTime=" + nextRetryTimeNanos +
    +                    ", nextRetryTimeMs=" + nextRetryTimeMs +
                         '}';
             }
     
    +        @Override
    +        public int hashCode() {
    +            int hash = 5;
    +            hash = 29 * hash + Objects.hashCode(this.msgId);
    +            return hash;
    +        }
    +
    +        @Override
    +        public boolean equals(Object obj) {
    +            if (this == obj) {
    +                return true;
    +            }
    +            if (obj == null) {
    +                return false;
    +            }
    +            if (getClass() != obj.getClass()) {
    +                return false;
    +            }
    +            final RetrySchedule other = (RetrySchedule) obj;
    +            if (!Objects.equals(this.msgId, other.msgId)) {
    +                return false;
    +            }
    +            return true;
    +        }
    +
             public KafkaSpoutMessageId msgId() {
                 return msgId;
             }
     
    -        public long nextRetryTimeNanos() {
    -            return nextRetryTimeNanos;
    +        public long nextRetryTimeMs() {
    +            return nextRetryTimeMs;
             }
         }
     
         public static class TimeInterval implements Serializable {
    -        private final long lengthNanos;
    -        private final long length;
    -        private final TimeUnit timeUnit;
    +        private final long lengthMs;
     
             /**
              * @param length length of the time interval in the units specified by {@link TimeUnit}
    -         * @param timeUnit unit used to specify a time interval on which to specify a time unit
    +         * @param timeUnit unit used to specify a time interval on which to specify a time unit. Smallest supported unit is milliseconds
              */
             public TimeInterval(long length, TimeUnit timeUnit) {
    -            this.length = length;
    -            this.timeUnit = timeUnit;
    -            this.lengthNanos = timeUnit.toNanos(length);
    +            
    +            if(timeUnit == TimeUnit.MICROSECONDS || timeUnit == TimeUnit.NANOSECONDS) {
    +                throw new IllegalArgumentException("TimeInterval does not support time units smaller than milliseconds");
    +            }
    --- End diff --
    
    same comment as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @HeartSaVioR I think as long as the spout can seek to the tuples it needs to retry instead of seeking to the committed offset, all of the minor issues I could think of are resolved. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r104615050
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java ---
    @@ -0,0 +1,245 @@
    +/*
    + * Copyright 2017 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.junit.Assert.assertThat;
    +import static org.mockito.Matchers.anyObject;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.reset;
    +import static org.mockito.Mockito.spy;
    +import static org.mockito.Mockito.times;
    +import static org.mockito.Mockito.verify;
    +
    +import info.batey.kafka.unit.KafkaUnitRule;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +import kafka.producer.KeyedMessage;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.utils.Time;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.mockito.ArgumentCaptor;
    +import org.mockito.MockitoAnnotations;
    +
    +public class MaxUncommittedOffsetTest {
    +
    +    @Rule
    +    public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
    +
    +    private final TopologyContext topologyContext = mock(TopologyContext.class);
    +    private final Map<String, Object> conf = new HashMap<>();
    +    private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
    +    private final long commitOffsetPeriodMs = 2_000;
    +    private final int numMessages = 100;
    +    private final int maxUncommittedOffsets = 10;
    +    //This is set to be the same as maxUncommittedOffsets since it is difficult to test maxUncommittedOffsets when maxPollRecords is not the same
    +    //If maxPollRecords is larger, a single call to poll will emit more than maxUncommittedOffsets messages
    +    //If maxPollRecords is lower, it will cap how far past the commit offset the spout can read when there are failed tuples ready for retry
    +    private final int maxPollRecords = maxUncommittedOffsets;
    +    private final int initialRetryDelaySecs = 60;
    +    private final KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaPort())
    +        .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
    +        .setMaxPollRecords(maxPollRecords)
    +        .setMaxUncommittedOffsets(maxUncommittedOffsets)
    +        .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
    +            1, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs))) //Retry once after a minute
    +        .build();
    +    private KafkaConsumer<String, String> consumerSpy;
    +    private KafkaConsumerFactory<String, String> consumerFactory;
    +    private KafkaSpout<String, String> spout;
    +
    +    @Before
    +    public void setUp() {
    +        MockitoAnnotations.initMocks(this);
    +        this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
    +        this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy;
    +        this.spout = new KafkaSpout<>(spoutConfig, consumerFactory);
    +    }
    +
    +    private void populateTopicData(String topicName, int msgCount) {
    +        kafkaUnitRule.getKafkaUnit().createTopic(topicName);
    +
    +        IntStream.range(0, msgCount).forEach(value -> {
    +            KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(
    +                topicName, Integer.toString(value),
    +                Integer.toString(value));
    +
    +            kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
    +        });
    +    }
    +
    +    private void initializeSpout(int msgCount) {
    +        populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
    +        spout.open(conf, topologyContext, collector);
    +        spout.activate();
    +    }
    +
    +    public ArgumentCaptor<KafkaSpoutMessageId> emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(int messageCount) {
    +        //The spout must respect maxUncommittedOffsets when requesting/emitting tuples
    +        initializeSpout(messageCount);
    +
    +        //Try to emit all messages. Ensure only maxUncommittedOffsets are emitted
    +        ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
    +        for (int i = 0; i < messageCount; i++) {
    +            spout.nextTuple();
    +        };
    +        verify(collector, times(maxUncommittedOffsets)).emit(
    +            anyObject(),
    +            anyObject(),
    +            messageIds.capture());
    +        return messageIds;
    +    }
    +
    +    @Test
    +    public void testNextTupleCanEmitMoreMessagesWhenDroppingBelowMaxUncommittedOffsetsDueToCommit() {
    +        //The spout must respect maxUncommittedOffsets after committing a set of records
    +        try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
    +            //First check that maxUncommittedOffsets is respected when emitting from scratch
    +            ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
    +            reset(collector);
    +
    +            //Ack all emitted messages and commit them
    +            for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
    +                spout.ack(messageId);
    +            }
    +            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
    +            spout.nextTuple();
    +
    +            //Now check that the spout will emit another maxUncommittedOffsets messages
    +            for (int i = 0; i < numMessages; i++) {
    +                spout.nextTuple();
    +            }
    +            verify(collector, times(maxUncommittedOffsets)).emit(
    +                anyObject(),
    +                anyObject(),
    +                anyObject());
    +        }
    +    }
    +
    +    @Test
    +    public void testNextTupleWillRespectMaxUncommittedOffsetsWhenThereAreAckedUncommittedTuples() {
    +        //The spout must respect maxUncommittedOffsets even if some tuples have been acked but not committed
    +        try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
    +            //First check that maxUncommittedOffsets is respected when emitting from scratch
    +            ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
    +            reset(collector);
    +
    +            //Fail all emitted messages except the last one. Try to commit.
    +            List<KafkaSpoutMessageId> messageIdList = messageIds.getAllValues();
    +            for (int i = 0; i < messageIdList.size() - 1; i++) {
    +                spout.fail(messageIdList.get(i));
    +            }
    +            spout.ack(messageIdList.get(messageIdList.size() - 1));
    +            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
    +            spout.nextTuple();
    +
    +            //Now check that the spout will not emit anything else since nothing has been committed
    +            for (int i = 0; i < numMessages; i++) {
    +                spout.nextTuple();
    +            }
    +
    +            verify(collector, times(0)).emit(
    +                anyObject(),
    +                anyObject(),
    +                anyObject());
    +        }
    +    }
    +
    +    private void failAllExceptTheFirstMessageThenCommit(ArgumentCaptor<KafkaSpoutMessageId> messageIds) {
    +        //Fail all emitted messages except the first. Commit the first.
    +        List<KafkaSpoutMessageId> messageIdList = messageIds.getAllValues();
    +        for (int i = 1; i < messageIdList.size(); i++) {
    +            spout.fail(messageIdList.get(i));
    +        }
    +        spout.ack(messageIdList.get(0));
    +        Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
    +        spout.nextTuple();
    +    }
    +
    +    @Test
    +    public void testNextTupleWillNotEmitMoreThanMaxUncommittedOffsetsPlusMaxPollRecordsMessages() {
    +        //The upper bound on uncommitted offsets should be maxUncommittedOffsets + maxPollRecords
    +        //This is reachable by emitting maxUncommittedOffsets messages, acking the first message, then polling.
    +        try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
    +            //First check that maxUncommittedOffsets is respected when emitting from scratch
    +            ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
    +            reset(collector);
    +
    +            failAllExceptTheFirstMessageThenCommit(messageIds);
    +
    +            //Offset 0 is acked, 1 to maxUncommittedOffsets - 1 are failed
    +            //The spout should now emit another maxPollRecords messages
    +            //This is allowed because the acked message brings the numUncommittedOffsets below the cap
    +            for (int i = 0; i < maxUncommittedOffsets; i++) {
    +                spout.nextTuple();
    +            }
    +
    +            ArgumentCaptor<KafkaSpoutMessageId> secondRunMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
    +            verify(collector, times(maxPollRecords)).emit(
    +                anyObject(),
    +                anyObject(),
    +                secondRunMessageIds.capture());
    +            reset(collector);
    +
    +            List<Long> firstRunOffsets = messageIds.getAllValues().stream()
    +                .map(messageId -> messageId.offset())
    +                .collect(Collectors.toList());
    +            List<Long> secondRunOffsets = secondRunMessageIds.getAllValues().stream()
    +                .map(messageId -> messageId.offset())
    +                .collect(Collectors.toList());
    +            assertThat("Expected the newly emitted messages to have no overlap with the first batch", secondRunOffsets.removeAll(firstRunOffsets), is(false));
    +
    +            //Offset 0 is acked, 1 to maxUncommittedOffsets-1 are failed, maxUncommittedOffsets to maxUncommittedOffsets + maxPollRecords-1 are emitted
    +            //There are now maxUncommittedOffsets-1 + maxPollRecords records emitted past the last committed offset
    +            //Retry the failed messages and ack the first one.
    +            Time.advanceTimeSecs(initialRetryDelaySecs);
    +            for (int i = 0; i < numMessages; i++) {
    +                spout.nextTuple();
    +            }
    +            ArgumentCaptor<KafkaSpoutMessageId> thirdRunMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
    +            verify(collector, times(maxPollRecords-1)).emit(
    +                anyObject(),
    +                anyObject(),
    +                thirdRunMessageIds.capture());
    +            reset(collector);
    +
    +            failAllExceptTheFirstMessageThenCommit(thirdRunMessageIds);
    --- End diff --
    
    Minor: might be better to add comparison between captured message ids and first run message ids but excluding first one to make sure failed tuples are reemitted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99658822
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---
    @@ -54,59 +56,92 @@
         private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
             @Override
             public int compare(RetrySchedule entry1, RetrySchedule entry2) {
    -            return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
    +            int result = Long.valueOf(entry1.nextRetryTimeMs()).compareTo(entry2.nextRetryTimeMs());
    +            
    +            if(result == 0) {
    +                //TreeSet uses compareTo instead of equals() for the Set contract
    +                //Ensure that we can save two retry schedules with the same timestamp
    +                result = entry1.hashCode() - entry2.hashCode();
    +            }
    +            return result;
             }
         }
     
         private class RetrySchedule {
             private final KafkaSpoutMessageId msgId;
    -        private long nextRetryTimeNanos;
    +        private long nextRetryTimeMs;
     
             public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) {
                 this.msgId = msgId;
    -            this.nextRetryTimeNanos = nextRetryTime;
    +            this.nextRetryTimeMs = nextRetryTime;
                 LOG.debug("Created {}", this);
             }
     
             public void setNextRetryTime() {
    -            nextRetryTimeNanos = nextTime(msgId);
    +            nextRetryTimeMs = nextTime(msgId);
                 LOG.debug("Updated {}", this);
             }
     
    -        public boolean retry(long currentTimeNanos) {
    -            return nextRetryTimeNanos <= currentTimeNanos;
    +        public boolean retry(long currentTimeMs) {
    +            return nextRetryTimeMs <= currentTimeMs;
             }
     
             @Override
             public String toString() {
                 return "RetrySchedule{" +
                         "msgId=" + msgId +
    -                    ", nextRetryTime=" + nextRetryTimeNanos +
    +                    ", nextRetryTimeMs=" + nextRetryTimeMs +
                         '}';
             }
     
    +        @Override
    +        public int hashCode() {
    --- End diff --
    
    I'll remove these two again, the Comparator change should work even with the default hashcode.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r106687999
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java ---
    @@ -41,31 +41,22 @@ public static Config getConfig() {
     
         public static StormTopology getTopologyKafkaSpout(int port) {
             final TopologyBuilder tp = new TopologyBuilder();
    -        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(port)), 1);
    +        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfigBuilder(port).build()), 1);
    --- End diff --
    
    The SingleTopicKafkaSpoutConfiguration class was beginning to turn into a mess of telescoping constructors, and it was a bit silly given that we already have a nice builder interface underlying that class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r107529960
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -273,11 +273,9 @@ public void setWaitingToEmit(ConsumerRecords<K,V> consumerRecords) {
         // ======== poll =========
         private ConsumerRecords<K, V> pollKafkaBroker() {
             Set<TopicPartition> retriableTopicPartitions = doSeekRetriableTopicPartitions();
    -        Set<TopicPartition> partitionsToPause = getPartitionsToPauseToEnforceUncommittedOffsetsLimit(retriableTopicPartitions);
             try{
    -            if(!partitionsToPause.isEmpty()) {
    -                kafkaConsumer.pause(partitionsToPause);
    -            }
    +            //Pause all partitions with no retriable messages
    +            pauseNonRetriablePartitionsToEnforceUncommitedOffsetsLimit(retriableTopicPartitions);
    --- End diff --
    
    retriableTopicPartitions could be inlined, but no big deal.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r106547911
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -270,17 +272,40 @@ public void setWaitingToEmit(ConsumerRecords<K,V> consumerRecords) {
     
         // ======== poll =========
         private ConsumerRecords<K, V> pollKafkaBroker() {
    -        doSeekRetriableTopicPartitions();
    -        if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
    -            kafkaSpoutConfig.getSubscription().refreshAssignment();
    +        Set<TopicPartition> retriableTopicPartitions = doSeekRetriableTopicPartitions();
    +        Set<TopicPartition> partitionsToPause = getPartitionsToPauseToEnforceUncommittedOffsetsLimit(retriableTopicPartitions);
    +        try{
    +            if(!partitionsToPause.isEmpty()) {
    +                kafkaConsumer.pause(partitionsToPause);
    +            }
    +            if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
    +                kafkaSpoutConfig.getSubscription().refreshAssignment();
    +            }
    +            final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
    +            final int numPolledRecords = consumerRecords.count();
    +            LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions", numPolledRecords, numUncommittedOffsets);
    +            return consumerRecords;
    +        } finally {
    +            if (!partitionsToPause.isEmpty()) {
    --- End diff --
    
    this if is not necessary. Let's remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99876099
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -289,6 +293,13 @@ private void doSeekRetriableTopicPartitions() {
                     kafkaConsumer.seek(rtp, acked.get(rtp).getCommittedOffset() + 1);    // Seek to last committed offset
                 }
             }
    +        
    +        if(!retriableTopicPartitions.isEmpty()) {
    +            //Pause other partitions for this poll so maxUncommittedPartitions isn't exceeded by too much
    +            Set<TopicPartition> assignment = new HashSet<>(kafkaConsumer.assignment());
    --- End diff --
    
    let's call this variable 'paused' or something along those lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r104774078
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -270,17 +272,36 @@ public void setWaitingToEmit(ConsumerRecords<K,V> consumerRecords) {
     
         // ======== poll =========
         private ConsumerRecords<K, V> pollKafkaBroker() {
    -        doSeekRetriableTopicPartitions();
    +        Set<TopicPartition> retriableTopicPartitions = doSeekRetriableTopicPartitions();
    +        Set<TopicPartition> pausedTopicPartitions = pauseNonRetriableTopicPartitionsIfAtUncommittedOffsetsLimit(retriableTopicPartitions);
             if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
                 kafkaSpoutConfig.getSubscription().refreshAssignment();
             }
             final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
    --- End diff --
    
    Will fix


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @hmcl yes, more or less.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99940042
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---
    @@ -54,59 +56,92 @@
         private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
             @Override
             public int compare(RetrySchedule entry1, RetrySchedule entry2) {
    -            return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
    +            int result = Long.valueOf(entry1.nextRetryTimeMs()).compareTo(entry2.nextRetryTimeMs());
    +            
    +            if(result == 0) {
    +                //TreeSet uses compareTo instead of equals() for the Set contract
    +                //Ensure that we can save two retry schedules with the same timestamp
    +                result = entry1.hashCode() - entry2.hashCode();
    +            }
    +            return result;
             }
         }
     
         private class RetrySchedule {
             private final KafkaSpoutMessageId msgId;
    -        private long nextRetryTimeNanos;
    +        private long nextRetryTimeMs;
    --- End diff --
    
    I'll take a look at putting nanosecond support into o.a.s.Time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    Squashed and rebased


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99635118
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -329,11 +328,13 @@ private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class
                 this.offsetCommitPeriodMs = offsetCommitPeriodMs;
                 return this;
             }
    -        
    +
             /**
              * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
              * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
    -         * of pending offsets bellow the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
    +         * of pending offsets below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
    +         * Note that this limit can in some cases be exceeded, but no partition will exceed this limit by more than 2*maxPollRecords.
    --- End diff --
    
    where is the number 2*maxPollRecords coming from?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99931862
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -289,6 +293,13 @@ private void doSeekRetriableTopicPartitions() {
                     kafkaConsumer.seek(rtp, acked.get(rtp).getCommittedOffset() + 1);    // Seek to last committed offset
                 }
             }
    +        
    +        if(!retriableTopicPartitions.isEmpty()) {
    +            //Pause other partitions for this poll so maxUncommittedPartitions isn't exceeded by too much
    --- End diff --
    
    I am still not quite understanding why we need to pause all other topic partitions in the case of a retrial. This could potentially lead to the scenario that one topic partition may be causing a lot of failures, but all others are running fine. If that's the case, we will the throttling all other topic-partitions for no particular reason.
    
    Are you doing this pause to handle the case when `nunUncommittedOffsets` may exceed `maxUncommittedOffsets`? If that is the case, then in the case of a retrial we should only only when  `nunUncommittedOffsets >= maxUncommittedOffsets`. Otherwise perhaps we should poll from all topic-partitions.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r106559751
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -270,17 +272,40 @@ public void setWaitingToEmit(ConsumerRecords<K,V> consumerRecords) {
     
         // ======== poll =========
         private ConsumerRecords<K, V> pollKafkaBroker() {
    -        doSeekRetriableTopicPartitions();
    -        if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
    -            kafkaSpoutConfig.getSubscription().refreshAssignment();
    +        Set<TopicPartition> retriableTopicPartitions = doSeekRetriableTopicPartitions();
    +        Set<TopicPartition> partitionsToPause = getPartitionsToPauseToEnforceUncommittedOffsetsLimit(retriableTopicPartitions);
    --- End diff --
    
    let's replace lines 276 to 280 with the code I suggest in [here](https://github.com/apache/storm/pull/1924/files#r106557594)
    
    !partitionsToPause.isEmpty() is not necessary


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99634206
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---
    @@ -54,59 +56,92 @@
         private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
             @Override
             public int compare(RetrySchedule entry1, RetrySchedule entry2) {
    -            return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
    +            int result = Long.valueOf(entry1.nextRetryTimeMs()).compareTo(entry2.nextRetryTimeMs());
    +            
    +            if(result == 0) {
    +                //TreeSet uses compareTo instead of equals() for the Set contract
    +                //Ensure that we can save two retry schedules with the same timestamp
    +                result = entry1.hashCode() - entry2.hashCode();
    +            }
    +            return result;
             }
         }
     
         private class RetrySchedule {
             private final KafkaSpoutMessageId msgId;
    -        private long nextRetryTimeNanos;
    +        private long nextRetryTimeMs;
    --- End diff --
    
    Any special reason to change this for ms ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r106689294
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -270,17 +272,40 @@ public void setWaitingToEmit(ConsumerRecords<K,V> consumerRecords) {
     
         // ======== poll =========
         private ConsumerRecords<K, V> pollKafkaBroker() {
    -        doSeekRetriableTopicPartitions();
    -        if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
    -            kafkaSpoutConfig.getSubscription().refreshAssignment();
    +        Set<TopicPartition> retriableTopicPartitions = doSeekRetriableTopicPartitions();
    +        Set<TopicPartition> partitionsToPause = getPartitionsToPauseToEnforceUncommittedOffsetsLimit(retriableTopicPartitions);
    --- End diff --
    
    I wasn't sure if calling resume() after every poll() might be expensive, but it appears to just set a flag in the consumer. I'll change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r107545196
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -273,11 +273,9 @@ public void setWaitingToEmit(ConsumerRecords<K,V> consumerRecords) {
         // ======== poll =========
         private ConsumerRecords<K, V> pollKafkaBroker() {
             Set<TopicPartition> retriableTopicPartitions = doSeekRetriableTopicPartitions();
    -        Set<TopicPartition> partitionsToPause = getPartitionsToPauseToEnforceUncommittedOffsetsLimit(retriableTopicPartitions);
             try{
    -            if(!partitionsToPause.isEmpty()) {
    -                kafkaConsumer.pause(partitionsToPause);
    -            }
    +            //Pause all partitions with no retriable messages
    +            pauseNonRetriablePartitionsToEnforceUncommitedOffsetsLimit(retriableTopicPartitions);
    --- End diff --
    
    I'm not sure that's more readable though


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @hmcl 
    I think @srdo addressed your last review comments. Could you finish the review?
    Please let me know if there're some points we need to discuss or resolve, so that we can address them now, or file new issues and postpone to next release if possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r107536862
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java ---
    @@ -266,4 +269,36 @@ public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception {
                 verifyAllMessagesCommitted(messageCount);
             }
         }
    +    
    +    @Test
    +    public void shouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws Exception {
    +        int messageCount = 10;
    +        initializeSpout(messageCount);
    +
    +        //play all tuples
    +        for (int i = 0; i < messageCount; i++) {
    +            spout.nextTuple();
    +        }
    +        ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
    +        verify(collector, times(messageCount)).emit(anyObject(), anyObject(), messageIds.capture());
    +        reset(collector);
    +        //Fail tuple 5 and 3, call nextTuple, then fail tuple 2
    +        List<KafkaSpoutMessageId> capturedMessageIds = messageIds.getAllValues();
    +        spout.fail(capturedMessageIds.get(5));
    +        spout.fail(capturedMessageIds.get(3));
    +        spout.nextTuple();
    --- End diff --
    
    What's the relation between this call to nextTuple and the failed messages? Why is it needed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99869535
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---
    @@ -54,59 +56,92 @@
         private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
             @Override
             public int compare(RetrySchedule entry1, RetrySchedule entry2) {
    -            return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
    +            int result = Long.valueOf(entry1.nextRetryTimeMs()).compareTo(entry2.nextRetryTimeMs());
    +            
    +            if(result == 0) {
    +                //TreeSet uses compareTo instead of equals() for the Set contract
    +                //Ensure that we can save two retry schedules with the same timestamp
    +                result = entry1.hashCode() - entry2.hashCode();
    +            }
    +            return result;
             }
         }
     
         private class RetrySchedule {
             private final KafkaSpoutMessageId msgId;
    -        private long nextRetryTimeNanos;
    +        private long nextRetryTimeMs;
    --- End diff --
    
    I would rather change the simulation code rather than the entire client code. In my opinion ms is already a large unit at the processor level, and it could be useful to provide more fine grade control.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99638011
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---
    @@ -54,59 +56,92 @@
         private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
             @Override
             public int compare(RetrySchedule entry1, RetrySchedule entry2) {
    -            return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
    +            int result = Long.valueOf(entry1.nextRetryTimeMs()).compareTo(entry2.nextRetryTimeMs());
    +            
    +            if(result == 0) {
    +                //TreeSet uses compareTo instead of equals() for the Set contract
    +                //Ensure that we can save two retry schedules with the same timestamp
    +                result = entry1.hashCode() - entry2.hashCode();
    +            }
    +            return result;
             }
         }
     
         private class RetrySchedule {
             private final KafkaSpoutMessageId msgId;
    -        private long nextRetryTimeNanos;
    +        private long nextRetryTimeMs;
    --- End diff --
    
    Storm's time simulation doesn't support values smaller than ms. I'd be happy to implement nanosecond support in Time instead if anyone actually needs sub-millis support.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r107455723
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---
    @@ -60,17 +61,19 @@ public int compare(RetrySchedule entry1, RetrySchedule entry2) {
                 if(result == 0) {
                     //TreeSet uses compareTo instead of equals() for the Set contract
                     //Ensure that we can save two retry schedules with the same timestamp
    -                result = entry1.hashCode() - entry2.hashCode();
    +                result = entry1.instanceNumber - entry2.instanceNumber;
    --- End diff --
    
    I would rather leave this with hasCode() or even as it was initially with equals(), but hasCode() is OK. This _instanceNumber_ code is very cryptic as is. Furthermore, this logic is accounting for an hypothetical scenario that will very unlikely, maybe never, happen, at least in production. Let's keep it simple. Code readability and simplicity are very important.
    
    Let's revert the whole change in this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r106557594
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -270,17 +272,40 @@ public void setWaitingToEmit(ConsumerRecords<K,V> consumerRecords) {
     
         // ======== poll =========
         private ConsumerRecords<K, V> pollKafkaBroker() {
    -        doSeekRetriableTopicPartitions();
    -        if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
    -            kafkaSpoutConfig.getSubscription().refreshAssignment();
    +        Set<TopicPartition> retriableTopicPartitions = doSeekRetriableTopicPartitions();
    +        Set<TopicPartition> partitionsToPause = getPartitionsToPauseToEnforceUncommittedOffsetsLimit(retriableTopicPartitions);
    +        try{
    +            if(!partitionsToPause.isEmpty()) {
    +                kafkaConsumer.pause(partitionsToPause);
    +            }
    +            if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
    +                kafkaSpoutConfig.getSubscription().refreshAssignment();
    +            }
    +            final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
    +            final int numPolledRecords = consumerRecords.count();
    +            LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions", numPolledRecords, numUncommittedOffsets);
    +            return consumerRecords;
    +        } finally {
    +            if (!partitionsToPause.isEmpty()) {
    +                //Ensure that we resume all partitions in case some were paused when processing retries
    +                kafkaConsumer.resume(kafkaConsumer.assignment());
    +            }
             }
    -        final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
    -        final int numPolledRecords = consumerRecords.count();
    -        LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions", numPolledRecords, numUncommittedOffsets);
    -        return consumerRecords;
         }
    -
    -    private void doSeekRetriableTopicPartitions() {
    +    
    +    private Set<TopicPartition> getPartitionsToPauseToEnforceUncommittedOffsetsLimit(Set<TopicPartition> retriableTopicPartitions) {
    --- End diff --
    
    To make the code more readable, let's put a method comment explaining the idea and replace this method with something along the lines:
    ```
    private void pauseNonRetriableTpsToEnforceUncommitedOffsetsLimit(Set<TopicPartition> retriableTopicPartitions) {
            final Set<TopicPartition> partitionsToPause = new HashSet<>(kafkaConsumer.assignment());
            partitionsToPause.removeAll(retriableTopicPartitions);
            kafkaConsumer.pause(partitionsToPause);
            // no need to return anything
        }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    Thanks @srdo for your patience. Merged into 1.x & master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by pavankumarp89 <gi...@git.apache.org>.
Github user pavankumarp89 commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    Can you please let us know if this fix will be applied to lower version 1.0.4 or 1.0.3


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r106532445
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -404,6 +407,14 @@ private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class
             }
             
             public KafkaSpoutConfig<K,V> build() {
    +            
    +            Number maxPollRecords = (Number)kafkaProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
    +            if(maxPollRecords == null) {
    +                setMaxPollRecords(maxUncommittedOffsets);
    --- End diff --
    
    @srdo I don't agree with this. If the user does not set MAX_POLL_RECORDS_CONFIG, this value should keep its Kafka default. It should not silently be set to maxUncommittedOffsets. They are two unrelated things, although I agree that MAX_POLL_RECORDS_CONFIG should not exceed maxUncommittedOffsets


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r106541357
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---
    @@ -54,21 +55,28 @@
         private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
             @Override
             public int compare(RetrySchedule entry1, RetrySchedule entry2) {
    -            return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
    +            int result = Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
    +            
    +            if(result == 0) {
    +                //TreeSet uses compareTo instead of equals() for the Set contract
    +                //Ensure that we can save two retry schedules with the same timestamp
    --- End diff --
    
    Isn't this impossible to happen? The KafkaSpout is single threaded. When it calls schedule(msgId) , the scheduled time will be associated with System.nanoTime(). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r107531160
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java ---
    @@ -54,10 +55,11 @@
         boolean retainAll(Collection<TopicPartition> topicPartitions);
     
         /**
    -     * @return set of topic partitions that have offsets that are ready to be retried, i.e.,
    -     * for which a tuple has failed and has retry time less than current time
    +     * @return The earliest retriable offset for each TopicPartition that has
    +     * offsets that are ready to be retried, i.e., for which a tuple has failed
    --- End diff --
    
    offsets ready to be retried


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    Very sorry about this, but this is still broken :( 
    
    In the current implementation (pre this PR), the spout can be prevented from polling if there are more than maxUncommittedOffsets failed tuples. This is because the check for whether to poll isn't accounting for retriable tuples already being counted in the numUncommittedOffsets count.
    
    The current fix is to allow failed tuples to retry always, even if maxUncommittedOffsets is exceeded, because failed tuples don't contribute further to the numUncommittedOffsets count. The problem has been to ensure that we don't also emit a bunch of new tuples when polling for retriable tuples, and end up ignoring maxUncommittedOffsets entirely. This was partially fixed by pausing partitions that have no retriable tuples.
    
    With the previous seeking behavior, this was a complete fix, since maxPollRecords put a bound on how far the spout could read from the commit offset in a poll that was ignoring maxUncommittedOffsets. With the new seeking behavior this is broken. If the spout has emitted as many tuples as allowed, and the last (highest offset) tuple fails, the spout may now poll for a full batch of new tuples, starting from the failed tuple. This scenario can repeat arbitrarily many times, so maxUncommittedOffsets is completely ineffective. 
    
    We don't want to go back to the old seeking behavior (IMO), because it meant that in the case where maxPollRecords is much lower than maxUncommittedOffsets (almost always), the spout might end up choking on failed tuples. For example, if maxPollRecords is 5, and tuple 0-4 are not ready for retry (they might have been retried already, and are now waiting for retry backoff), but tuple 5-9 are, the spout is unable to retry 5-9 (or anything else on that partition) because it keeps seeking back to 0, and polling out the first 5 tuples. Seeking directly to the retriable tuples should in most cases be more efficient as well, because in the old implementation we'd just be seeking to the last committed offset, polling, and discarding tuples until we reach the ones that can be retried.
    
    We could probably fix the broken behavior by trying really hard not to emit new tuples when we're ignoring maxUncommittedOffsets, but that seems like it would be error prone and complicated to implement.
    
    I think we might be able to fix this by ensuring that we don't "doublecount" retriable tuples. When the spout is deciding whether to poll, it should deduct retriable tuples from numUncommittedOffsets when comparing to maxUncommittedOffsets.
    
    Changing the poll check in this way is the same as enforcing the following constraint per partition, it seems to me:
    * Poll only if `numNonRetriableEmittedTuples < maxUncommittedOffsets`. If there are more nonretriable tuples than that, the poll won't be allowed because `numUncommittedOffsets = numRetriableTuples + numNonRetriableEmittedTuples`, so `numUncommittedOffsets - numRetriableTuples >= maxUncommittedOffsets`. 
    
    This should mean that the limit on uncommitted tuples on each partition is going to be `maxUncommittedOffsets + maxPollRecords - 1`, because the latest tuple that can be retried on a partition is the one at offset `maxUncommittedOffsets`, where there are `maxUncommittedOffsets - 1` uncommitted tuples "to the left". If the retry poll starts at that offset, it at most emits the retried tuple plus `maxPollRecords - 1` new tuples.
    
    There shouldn't be any problems when multiple partitions have retriable tuples, where retriable tuples on one partition might be able to cause a different partition to break the uncommitted offset limit. This is because a partition will at minimum contribute 0 to numUncommittedOffsets (e.g. if all uncommitted tuples on that partition are retriable), because any retriable tuples being subtracted were already counted in numUncommittedOffsets when the tuples were originally emitted.
    
    If we can enforce the limit on a per partition basis this way, there's no reason to worry about only emitting retriable tuples when we're exceeding maxUncommittedOffsets. 
    
    I don't think there's a need for pausing partitions anymore either. It was meant to prevent polling for new tuples when there were retriable tuples, but we're no longer trying to prevent that, since the per partition cap is already ensuring we won't emit too many tuples. Pausing in this case would prioritize retriable tuples over new tuples (e.g. in the case where an unpaused consumer might choose to fetch from a nonretriable partition even though there are retriable tuples), but might lead to lower throughput overall (in the case where there are not enough messages on the retriable partitions to fill a batch). I've removed it again.
    
    I've put up what I hope is the fix both here and on the 1.x branch. Sorry again that this has turned into such a moving target.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r104774094
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -329,11 +350,13 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
                         }
                     
                         emitted.add(msgId);
    -                    numUncommittedOffsets++;
                     
                         if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
                             retryService.remove(msgId);
    -                    }
    +                    } else {
    +                    //New tuple, increment the uncommitted offset counter
    --- End diff --
    
    Will fix


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r104605596
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java ---
    @@ -0,0 +1,169 @@
    +/*
    --- End diff --
    
    We should have separate PR for 1.x branch as well, or change current implementation to be compatible with JDK 7.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    +1
    Once @hmcl finished the review and no outstanding comments there I'll merge.
    
    Btw, regarding maxUncommittedOffsets, yes it provides more guarantees but if we guarantee that commit occurs in time periodically (we're providing option too), IMHO that's enough.
    
    The thing I have in mind is that commit in nextTuple() doesn't strictly guarantee commit in time. Two situations come in mind: reached max spout pending, backpressure in place.
    
    If spout reached max spout pending, since we are emitting **at most a tuple** for nextTuple() so when tuple tree for a tuple is complete or tuple fails, nextTuple() can be called. At least nextTuple() will be called after tuple timeout (actually up to 2 * tuple timeout due to underlying implementation).
    So if users can tolerate this, it might be OK, but could be far from configured value.
    
    If backpressure is in place, nextTuple() will never be called. We can't guarantee any time frame here.
    
    If we want to strictly guarantee that commit occurs in time, it should be apart from spout's lifecycle, like timer. It might also introduce some handling of thread-safety so a bit more complicated, but IMO simpler than respecting maxUncommittedOffsets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @hmcl 
    I guess you're busy, but could you continue reviewing this one, or please let me know if you are not available for reviewing soon? I'd like to get this merged sooner since this is necessary for 1.1.0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    I think it would be best if we merged maxPollOffsets and maxUncommittedOffsets, since having them be different has some undesirable side effects. @hmcl do you have an opinion.
    
    Also this drops support for Kafka 0.9 in KafkaSpoutConfig, since I believe https://github.com/apache/storm/pull/1556 already means that this release won't work with 0.9.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r106542400
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java ---
    @@ -41,31 +41,22 @@ public static Config getConfig() {
     
         public static StormTopology getTopologyKafkaSpout(int port) {
             final TopologyBuilder tp = new TopologyBuilder();
    -        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(port)), 1);
    +        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfigBuilder(port).build()), 1);
    --- End diff --
    
    why return a builder instead of the actual config?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @srdo @HeartSaVioR I will do the last pass on this first thing in the morning such that we can merge this in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r104611164
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java ---
    @@ -0,0 +1,245 @@
    +/*
    + * Copyright 2017 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.junit.Assert.assertThat;
    +import static org.mockito.Matchers.anyObject;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.reset;
    +import static org.mockito.Mockito.spy;
    +import static org.mockito.Mockito.times;
    +import static org.mockito.Mockito.verify;
    +
    +import info.batey.kafka.unit.KafkaUnitRule;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +import kafka.producer.KeyedMessage;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.utils.Time;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.mockito.ArgumentCaptor;
    +import org.mockito.MockitoAnnotations;
    +
    +public class MaxUncommittedOffsetTest {
    +
    +    @Rule
    +    public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
    +
    +    private final TopologyContext topologyContext = mock(TopologyContext.class);
    +    private final Map<String, Object> conf = new HashMap<>();
    +    private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
    +    private final long commitOffsetPeriodMs = 2_000;
    +    private final int numMessages = 100;
    +    private final int maxUncommittedOffsets = 10;
    +    //This is set to be the same as maxUncommittedOffsets since it is difficult to test maxUncommittedOffsets when maxPollRecords is not the same
    +    //If maxPollRecords is larger, a single call to poll will emit more than maxUncommittedOffsets messages
    +    //If maxPollRecords is lower, it will cap how far past the commit offset the spout can read when there are failed tuples ready for retry
    +    private final int maxPollRecords = maxUncommittedOffsets;
    +    private final int initialRetryDelaySecs = 60;
    +    private final KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaPort())
    +        .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
    +        .setMaxPollRecords(maxPollRecords)
    +        .setMaxUncommittedOffsets(maxUncommittedOffsets)
    +        .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
    +            1, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs))) //Retry once after a minute
    +        .build();
    +    private KafkaConsumer<String, String> consumerSpy;
    +    private KafkaConsumerFactory<String, String> consumerFactory;
    +    private KafkaSpout<String, String> spout;
    +
    +    @Before
    +    public void setUp() {
    +        MockitoAnnotations.initMocks(this);
    +        this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
    +        this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy;
    +        this.spout = new KafkaSpout<>(spoutConfig, consumerFactory);
    +    }
    +
    +    private void populateTopicData(String topicName, int msgCount) {
    +        kafkaUnitRule.getKafkaUnit().createTopic(topicName);
    +
    +        IntStream.range(0, msgCount).forEach(value -> {
    +            KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(
    +                topicName, Integer.toString(value),
    +                Integer.toString(value));
    +
    +            kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
    +        });
    +    }
    +
    +    private void initializeSpout(int msgCount) {
    +        populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
    +        spout.open(conf, topologyContext, collector);
    +        spout.activate();
    +    }
    +
    +    public ArgumentCaptor<KafkaSpoutMessageId> emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(int messageCount) {
    --- End diff --
    
    Minor: Might be better to include assertion for `messageCount >= maxUncommittedOffsets`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @hmcl @HeartSaVioR I think if we can't think of a good use case for maxUncommittedOffsets, we'd be better off removing it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99641026
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -289,6 +293,13 @@ private void doSeekRetriableTopicPartitions() {
                     kafkaConsumer.seek(rtp, acked.get(rtp).getCommittedOffset() + 1);    // Seek to last committed offset
                 }
             }
    +        
    +        if(!retriableTopicPartitions.isEmpty()) {
    +            //Pause other partitions for this poll so maxUncommittedPartitions isn't exceeded by too much
    --- End diff --
    
    I am a bit confused about this. When can it be exceeded? By how much? This should be a deterministic number.
    
    I read your comment in the email thread about  maxSpoutPending. Is this related to that ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99637679
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---
    @@ -27,13 +27,15 @@
     import java.util.Comparator;
     import java.util.HashSet;
     import java.util.Iterator;
    +import java.util.Objects;
     import java.util.Set;
     import java.util.TreeSet;
     import java.util.concurrent.TimeUnit;
    +import org.apache.storm.utils.Time;
     
     /**
      * Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows:
    - * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1)    where failCount = 1, 2, 3, ...
    + * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod*2^(failCount-1)    where failCount = 1, 2, 3, ...
      * nextRetry = Min(nextRetry, currentTime + maxDelay)
    --- End diff --
    
    The current implementation will multiply the delay by delayPeriod for each retry after the first. Let's say I set initialDelay to 1000ms and retryPeriod to 1000ms. The first retry is then scheduled at 1 second after failure. The second at 1 second after that. The third retry is scheduled 1000 seconds after that. The usual exponential backoff function would multiply the backoff by 2 for each failure, not by the backoff itself. This is a geometric progression where you can only set ratio, not scale factor, which isn't very useful (for anything except tiny backoffs, it grows much too fast). If we want to have backoffs with multipliers other than 2, we could add another parameter for that? (in that case that parameter should be an int, not a time period IMO)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99642608
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -240,7 +240,9 @@ private boolean commit() {
     
         private boolean poll() {
             final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
    -        final boolean poll = !waitingToEmit() && numUncommittedOffsets < maxUncommittedOffsets;
    +        final boolean poll = !waitingToEmit() &&
    +            (numUncommittedOffsets < maxUncommittedOffsets ||
    +            !retryService.retriableTopicPartitions().isEmpty());
    --- End diff --
    
    No, I don't think so. The tuples need to be both failed and ready for retry for this to be true, and if it's true, the consumer will seek back to the last committed offset on the relevant topic partitions. The consumer pause in doSeekRetriableTopicPartitions ensures that we don't get messages for the partitions that don't have failed tuples. maxPollRecords then caps how far past the commit offset we can read on those partitions. If the same tuples are always failing, we'll never get more than maxPollRecords from the commit offset on those partitions as far as I can tell.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @srdo apologies for a little delay. Had a lot of deadlines for last week. I will finish reviewing this today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @pavankumarp89 It should probably go in 1.2.0 or something like that. I definitely want it backported to 1.x, but I think we're trying to avoid API breakage in patch versions, and this PR changes the RetryService API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @srdo is this patch still relevant in face of the other patches that got merged in ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r104603040
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -270,17 +272,36 @@ public void setWaitingToEmit(ConsumerRecords<K,V> consumerRecords) {
     
         // ======== poll =========
         private ConsumerRecords<K, V> pollKafkaBroker() {
    -        doSeekRetriableTopicPartitions();
    +        Set<TopicPartition> retriableTopicPartitions = doSeekRetriableTopicPartitions();
    +        Set<TopicPartition> pausedTopicPartitions = pauseNonRetriableTopicPartitionsIfAtUncommittedOffsetsLimit(retriableTopicPartitions);
             if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
                 kafkaSpoutConfig.getSubscription().refreshAssignment();
             }
             final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
    --- End diff --
    
    Minor: I'd rather apply defensive programming here (to guarantee calling `resume` at any chance) like below: 
    ```
    try {
        final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
        final int numPolledRecords = consumerRecords.count();
        LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions", numPolledRecords, numUncommittedOffsets);
        return consumerRecords;
    } finally {
        if (!pausedTopicPartitions.isEmpty()) {
            //Ensure that we resume all partitions in case some were paused when processing retries
            kafkaConsumer.resume(kafkaConsumer.assignment());
        }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    This also changes the exponential backoff formula from currentTime + delayPeriod^(failCount-1) to currentTime + delayPeriod*2^(failCount-1). Multiplying the delay by itself causes the delay to grow extremely quickly, and probably wasn't intended.
    
    It might make sense to add jitter to the backoff as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r104774622
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java ---
    @@ -0,0 +1,245 @@
    +/*
    + * Copyright 2017 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.junit.Assert.assertThat;
    +import static org.mockito.Matchers.anyObject;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.reset;
    +import static org.mockito.Mockito.spy;
    +import static org.mockito.Mockito.times;
    +import static org.mockito.Mockito.verify;
    +
    +import info.batey.kafka.unit.KafkaUnitRule;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +import kafka.producer.KeyedMessage;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.utils.Time;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.mockito.ArgumentCaptor;
    +import org.mockito.MockitoAnnotations;
    +
    +public class MaxUncommittedOffsetTest {
    +
    +    @Rule
    +    public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
    +
    +    private final TopologyContext topologyContext = mock(TopologyContext.class);
    +    private final Map<String, Object> conf = new HashMap<>();
    +    private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
    +    private final long commitOffsetPeriodMs = 2_000;
    +    private final int numMessages = 100;
    +    private final int maxUncommittedOffsets = 10;
    +    //This is set to be the same as maxUncommittedOffsets since it is difficult to test maxUncommittedOffsets when maxPollRecords is not the same
    +    //If maxPollRecords is larger, a single call to poll will emit more than maxUncommittedOffsets messages
    +    //If maxPollRecords is lower, it will cap how far past the commit offset the spout can read when there are failed tuples ready for retry
    +    private final int maxPollRecords = maxUncommittedOffsets;
    +    private final int initialRetryDelaySecs = 60;
    +    private final KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaPort())
    +        .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
    +        .setMaxPollRecords(maxPollRecords)
    +        .setMaxUncommittedOffsets(maxUncommittedOffsets)
    +        .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
    +            1, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs))) //Retry once after a minute
    +        .build();
    +    private KafkaConsumer<String, String> consumerSpy;
    +    private KafkaConsumerFactory<String, String> consumerFactory;
    +    private KafkaSpout<String, String> spout;
    +
    +    @Before
    +    public void setUp() {
    +        MockitoAnnotations.initMocks(this);
    +        this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
    +        this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy;
    +        this.spout = new KafkaSpout<>(spoutConfig, consumerFactory);
    +    }
    +
    +    private void populateTopicData(String topicName, int msgCount) {
    +        kafkaUnitRule.getKafkaUnit().createTopic(topicName);
    +
    +        IntStream.range(0, msgCount).forEach(value -> {
    +            KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(
    +                topicName, Integer.toString(value),
    +                Integer.toString(value));
    +
    +            kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
    +        });
    +    }
    +
    +    private void initializeSpout(int msgCount) {
    +        populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
    +        spout.open(conf, topologyContext, collector);
    +        spout.activate();
    +    }
    +
    +    public ArgumentCaptor<KafkaSpoutMessageId> emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(int messageCount) {
    --- End diff --
    
    Will add


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r104777128
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java ---
    @@ -0,0 +1,245 @@
    +/*
    + * Copyright 2017 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.junit.Assert.assertThat;
    +import static org.mockito.Matchers.anyObject;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.reset;
    +import static org.mockito.Mockito.spy;
    +import static org.mockito.Mockito.times;
    +import static org.mockito.Mockito.verify;
    +
    +import info.batey.kafka.unit.KafkaUnitRule;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +import kafka.producer.KeyedMessage;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.utils.Time;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.mockito.ArgumentCaptor;
    +import org.mockito.MockitoAnnotations;
    +
    +public class MaxUncommittedOffsetTest {
    +
    +    @Rule
    +    public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
    +
    +    private final TopologyContext topologyContext = mock(TopologyContext.class);
    +    private final Map<String, Object> conf = new HashMap<>();
    +    private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
    +    private final long commitOffsetPeriodMs = 2_000;
    +    private final int numMessages = 100;
    +    private final int maxUncommittedOffsets = 10;
    +    //This is set to be the same as maxUncommittedOffsets since it is difficult to test maxUncommittedOffsets when maxPollRecords is not the same
    +    //If maxPollRecords is larger, a single call to poll will emit more than maxUncommittedOffsets messages
    +    //If maxPollRecords is lower, it will cap how far past the commit offset the spout can read when there are failed tuples ready for retry
    +    private final int maxPollRecords = maxUncommittedOffsets;
    +    private final int initialRetryDelaySecs = 60;
    +    private final KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaPort())
    +        .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
    +        .setMaxPollRecords(maxPollRecords)
    +        .setMaxUncommittedOffsets(maxUncommittedOffsets)
    +        .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
    +            1, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs))) //Retry once after a minute
    +        .build();
    +    private KafkaConsumer<String, String> consumerSpy;
    +    private KafkaConsumerFactory<String, String> consumerFactory;
    +    private KafkaSpout<String, String> spout;
    +
    +    @Before
    +    public void setUp() {
    +        MockitoAnnotations.initMocks(this);
    +        this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
    +        this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy;
    +        this.spout = new KafkaSpout<>(spoutConfig, consumerFactory);
    +    }
    +
    +    private void populateTopicData(String topicName, int msgCount) {
    +        kafkaUnitRule.getKafkaUnit().createTopic(topicName);
    +
    +        IntStream.range(0, msgCount).forEach(value -> {
    +            KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(
    +                topicName, Integer.toString(value),
    +                Integer.toString(value));
    +
    +            kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
    +        });
    +    }
    +
    +    private void initializeSpout(int msgCount) {
    +        populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
    +        spout.open(conf, topologyContext, collector);
    +        spout.activate();
    +    }
    +
    +    public ArgumentCaptor<KafkaSpoutMessageId> emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(int messageCount) {
    +        //The spout must respect maxUncommittedOffsets when requesting/emitting tuples
    +        initializeSpout(messageCount);
    +
    +        //Try to emit all messages. Ensure only maxUncommittedOffsets are emitted
    +        ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
    +        for (int i = 0; i < messageCount; i++) {
    +            spout.nextTuple();
    +        };
    +        verify(collector, times(maxUncommittedOffsets)).emit(
    +            anyObject(),
    +            anyObject(),
    +            messageIds.capture());
    +        return messageIds;
    +    }
    +
    +    @Test
    +    public void testNextTupleCanEmitMoreMessagesWhenDroppingBelowMaxUncommittedOffsetsDueToCommit() {
    +        //The spout must respect maxUncommittedOffsets after committing a set of records
    +        try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
    +            //First check that maxUncommittedOffsets is respected when emitting from scratch
    +            ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
    +            reset(collector);
    +
    +            //Ack all emitted messages and commit them
    +            for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
    +                spout.ack(messageId);
    +            }
    +            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
    +            spout.nextTuple();
    +
    +            //Now check that the spout will emit another maxUncommittedOffsets messages
    +            for (int i = 0; i < numMessages; i++) {
    +                spout.nextTuple();
    +            }
    +            verify(collector, times(maxUncommittedOffsets)).emit(
    +                anyObject(),
    +                anyObject(),
    +                anyObject());
    +        }
    +    }
    +
    +    @Test
    +    public void testNextTupleWillRespectMaxUncommittedOffsetsWhenThereAreAckedUncommittedTuples() {
    +        //The spout must respect maxUncommittedOffsets even if some tuples have been acked but not committed
    +        try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
    +            //First check that maxUncommittedOffsets is respected when emitting from scratch
    +            ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
    +            reset(collector);
    +
    +            //Fail all emitted messages except the last one. Try to commit.
    +            List<KafkaSpoutMessageId> messageIdList = messageIds.getAllValues();
    +            for (int i = 0; i < messageIdList.size() - 1; i++) {
    +                spout.fail(messageIdList.get(i));
    +            }
    +            spout.ack(messageIdList.get(messageIdList.size() - 1));
    +            Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
    +            spout.nextTuple();
    +
    +            //Now check that the spout will not emit anything else since nothing has been committed
    +            for (int i = 0; i < numMessages; i++) {
    +                spout.nextTuple();
    +            }
    +
    +            verify(collector, times(0)).emit(
    +                anyObject(),
    +                anyObject(),
    +                anyObject());
    +        }
    +    }
    +
    +    private void failAllExceptTheFirstMessageThenCommit(ArgumentCaptor<KafkaSpoutMessageId> messageIds) {
    +        //Fail all emitted messages except the first. Commit the first.
    +        List<KafkaSpoutMessageId> messageIdList = messageIds.getAllValues();
    +        for (int i = 1; i < messageIdList.size(); i++) {
    +            spout.fail(messageIdList.get(i));
    +        }
    +        spout.ack(messageIdList.get(0));
    +        Time.advanceTime(commitOffsetPeriodMs + KafkaSpout.TIMER_DELAY_MS);
    +        spout.nextTuple();
    +    }
    +
    +    @Test
    +    public void testNextTupleWillNotEmitMoreThanMaxUncommittedOffsetsPlusMaxPollRecordsMessages() {
    +        //The upper bound on uncommitted offsets should be maxUncommittedOffsets + maxPollRecords
    +        //This is reachable by emitting maxUncommittedOffsets messages, acking the first message, then polling.
    +        try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
    +            //First check that maxUncommittedOffsets is respected when emitting from scratch
    +            ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
    +            reset(collector);
    +
    +            failAllExceptTheFirstMessageThenCommit(messageIds);
    +
    +            //Offset 0 is acked, 1 to maxUncommittedOffsets - 1 are failed
    +            //The spout should now emit another maxPollRecords messages
    +            //This is allowed because the acked message brings the numUncommittedOffsets below the cap
    +            for (int i = 0; i < maxUncommittedOffsets; i++) {
    +                spout.nextTuple();
    +            }
    +
    +            ArgumentCaptor<KafkaSpoutMessageId> secondRunMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
    +            verify(collector, times(maxPollRecords)).emit(
    +                anyObject(),
    +                anyObject(),
    +                secondRunMessageIds.capture());
    +            reset(collector);
    +
    +            List<Long> firstRunOffsets = messageIds.getAllValues().stream()
    +                .map(messageId -> messageId.offset())
    +                .collect(Collectors.toList());
    +            List<Long> secondRunOffsets = secondRunMessageIds.getAllValues().stream()
    +                .map(messageId -> messageId.offset())
    +                .collect(Collectors.toList());
    +            assertThat("Expected the newly emitted messages to have no overlap with the first batch", secondRunOffsets.removeAll(firstRunOffsets), is(false));
    +
    +            //Offset 0 is acked, 1 to maxUncommittedOffsets-1 are failed, maxUncommittedOffsets to maxUncommittedOffsets + maxPollRecords-1 are emitted
    +            //There are now maxUncommittedOffsets-1 + maxPollRecords records emitted past the last committed offset
    +            //Retry the failed messages and ack the first one.
    +            Time.advanceTimeSecs(initialRetryDelaySecs);
    +            for (int i = 0; i < numMessages; i++) {
    +                spout.nextTuple();
    +            }
    +            ArgumentCaptor<KafkaSpoutMessageId> thirdRunMessageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
    +            verify(collector, times(maxPollRecords-1)).emit(
    +                anyObject(),
    +                anyObject(),
    +                thirdRunMessageIds.capture());
    +            reset(collector);
    +
    +            failAllExceptTheFirstMessageThenCommit(thirdRunMessageIds);
    --- End diff --
    
    Will add a comparison


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r104775298
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java ---
    @@ -0,0 +1,245 @@
    +/*
    + * Copyright 2017 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.junit.Assert.assertThat;
    +import static org.mockito.Matchers.anyObject;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.reset;
    +import static org.mockito.Mockito.spy;
    +import static org.mockito.Mockito.times;
    +import static org.mockito.Mockito.verify;
    +
    +import info.batey.kafka.unit.KafkaUnitRule;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +import kafka.producer.KeyedMessage;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.utils.Time;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.mockito.ArgumentCaptor;
    +import org.mockito.MockitoAnnotations;
    +
    +public class MaxUncommittedOffsetTest {
    +
    +    @Rule
    +    public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
    +
    +    private final TopologyContext topologyContext = mock(TopologyContext.class);
    +    private final Map<String, Object> conf = new HashMap<>();
    +    private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
    +    private final long commitOffsetPeriodMs = 2_000;
    +    private final int numMessages = 100;
    +    private final int maxUncommittedOffsets = 10;
    +    //This is set to be the same as maxUncommittedOffsets since it is difficult to test maxUncommittedOffsets when maxPollRecords is not the same
    +    //If maxPollRecords is larger, a single call to poll will emit more than maxUncommittedOffsets messages
    +    //If maxPollRecords is lower, it will cap how far past the commit offset the spout can read when there are failed tuples ready for retry
    +    private final int maxPollRecords = maxUncommittedOffsets;
    +    private final int initialRetryDelaySecs = 60;
    +    private final KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaPort())
    +        .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
    +        .setMaxPollRecords(maxPollRecords)
    +        .setMaxUncommittedOffsets(maxUncommittedOffsets)
    +        .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
    +            1, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs))) //Retry once after a minute
    +        .build();
    +    private KafkaConsumer<String, String> consumerSpy;
    +    private KafkaConsumerFactory<String, String> consumerFactory;
    +    private KafkaSpout<String, String> spout;
    +
    +    @Before
    +    public void setUp() {
    +        MockitoAnnotations.initMocks(this);
    +        this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
    +        this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy;
    +        this.spout = new KafkaSpout<>(spoutConfig, consumerFactory);
    +    }
    +
    +    private void populateTopicData(String topicName, int msgCount) {
    +        kafkaUnitRule.getKafkaUnit().createTopic(topicName);
    +
    +        IntStream.range(0, msgCount).forEach(value -> {
    +            KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(
    +                topicName, Integer.toString(value),
    +                Integer.toString(value));
    +
    +            kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
    +        });
    +    }
    +
    +    private void initializeSpout(int msgCount) {
    +        populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
    +        spout.open(conf, topologyContext, collector);
    +        spout.activate();
    +    }
    +
    +    public ArgumentCaptor<KafkaSpoutMessageId> emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(int messageCount) {
    +        //The spout must respect maxUncommittedOffsets when requesting/emitting tuples
    +        initializeSpout(messageCount);
    +
    +        //Try to emit all messages. Ensure only maxUncommittedOffsets are emitted
    +        ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
    +        for (int i = 0; i < messageCount; i++) {
    +            spout.nextTuple();
    +        };
    +        verify(collector, times(maxUncommittedOffsets)).emit(
    +            anyObject(),
    +            anyObject(),
    +            messageIds.capture());
    +        return messageIds;
    +    }
    +
    +    @Test
    +    public void testNextTupleCanEmitMoreMessagesWhenDroppingBelowMaxUncommittedOffsetsDueToCommit() {
    +        //The spout must respect maxUncommittedOffsets after committing a set of records
    +        try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
    +            //First check that maxUncommittedOffsets is respected when emitting from scratch
    +            ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
    --- End diff --
    
    Added a comment to numMessages


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @hmcl I made the changes mentioned in my last comment. 
    
    As far as I can tell there's no way to get the default max.poll.records out of Kafka, so for now I've made do with adding comments in KafkaSpoutConfig recommending that maxUncommittedOffsets and maxPollRecords should be equal.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @srdo 
    Thanks for your patience. +1
    
    I can squash this and merge, but seems like #2004 depends on this so it would be clearer if you squash the commits for this PR (and for 1.x branch #2022 too), and rebase #2004 after merging this.
    
    Let me know when you finish squashing. Thanks again!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99941130
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java ---
    @@ -0,0 +1,156 @@
    +/*
    + * Copyright 2017 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.junit.Assert.assertThat;
    +import static org.mockito.Matchers.anyCollection;
    +import static org.mockito.Matchers.anyLong;
    +import static org.mockito.Matchers.anyObject;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.reset;
    +import static org.mockito.Mockito.times;
    +import static org.mockito.Mockito.verify;
    +import static org.mockito.Mockito.when;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.ConsumerRecord;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.junit.Before;
    +import org.junit.Test;
    +import org.mockito.ArgumentCaptor;
    +
    +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
    +
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Time.SimulatedTime;
    +
    +public class KafkaSpoutEmitTest {
    +
    +    private final long offsetCommitPeriodMs = 2_000;
    +    private final TopologyContext contextMock = mock(TopologyContext.class);
    +    private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
    +    private final Map<String, Object> conf = new HashMap<>();
    +    private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
    +    private KafkaConsumer<String, String> consumerMock;
    +    private KafkaSpout<String, String> spout;
    +    private KafkaSpoutConfig spoutConfig;
    +
    +    private void setupSpout() {
    +        spoutConfig = getKafkaSpoutConfigBuilder(-1)
    +            .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
    +            .build();
    +
    +        consumerMock = mock(KafkaConsumer.class);
    +        KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
    +
    +        //Set up a spout listening to 1 topic partition
    +        spout = new KafkaSpout<>(spoutConfig)
    +            .setKafkaConsumerFactory(consumerFactory);
    +
    +        spout.open(conf, contextMock, collectorMock);
    +        spout.activate();
    +
    +        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
    +        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
    +
    +        //Assign partitions to the spout
    +        ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
    +        consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(partition));
    +    }
    +
    +    @Test
    +    public void shouldEmitAtMostOneMessagePerNextTupleCall() {
    --- End diff --
    
    Will rename.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @HeartSaVioR I agree, I think it would be much simpler to rely on maxSpoutPending. maxUncommittedOffsets does have a slightly different meaning though. With only maxSpoutPending, the spout might progress arbitrarily far past the last committed offset, since there's only a limit on how many tuples are in-flight at once, rather than on how many tuples have been emitted past the committed offset. If a user needs to cap how far the spout can read ahead for some reason, they can't do that with maxSpoutPending.
    
    I can't tell if that's a real need anyone has though. @hmcl any opinion?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r107547996
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java ---
    @@ -266,4 +269,36 @@ public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception {
                 verifyAllMessagesCommitted(messageCount);
             }
         }
    +    
    +    @Test
    +    public void shouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws Exception {
    +        int messageCount = 10;
    +        initializeSpout(messageCount);
    +
    +        //play all tuples
    +        for (int i = 0; i < messageCount; i++) {
    +            spout.nextTuple();
    +        }
    +        ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
    +        verify(collector, times(messageCount)).emit(anyObject(), anyObject(), messageIds.capture());
    +        reset(collector);
    +        //Fail tuple 5 and 3, call nextTuple, then fail tuple 2
    +        List<KafkaSpoutMessageId> capturedMessageIds = messageIds.getAllValues();
    +        spout.fail(capturedMessageIds.get(5));
    +        spout.fail(capturedMessageIds.get(3));
    +        spout.nextTuple();
    --- End diff --
    
    Basically I wanted to test that the spout will emit each retriable tuple exactly once, even if the tuples fail in a weird order or Storm asks for more tuples in the middle of the failures.
    
    Added a comment to the test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @srdo Thanks for the note. I will take a look later today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99950879
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -289,6 +293,13 @@ private void doSeekRetriableTopicPartitions() {
                     kafkaConsumer.seek(rtp, acked.get(rtp).getCommittedOffset() + 1);    // Seek to last committed offset
                 }
             }
    +        
    +        if(!retriableTopicPartitions.isEmpty()) {
    +            //Pause other partitions for this poll so maxUncommittedPartitions isn't exceeded by too much
    --- End diff --
    
    I thought about it some more, and I don't think it's necessary. As long as we only have to guarantee that maxUncommittedOffsets isn't exceeded by more than maxPollRecords per partition, it should be fine to consume from all partitions. I'll remove this.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r106541885
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java ---
    @@ -41,31 +41,22 @@ public static Config getConfig() {
     
         public static StormTopology getTopologyKafkaSpout(int port) {
             final TopologyBuilder tp = new TopologyBuilder();
    -        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(port)), 1);
    +        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfigBuilder(port).build()), 1);
             tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);
             return tp.createTopology();
         }
     
    -    static public KafkaSpoutConfig<String, String> getKafkaSpoutConfig(int port) {
    -        return getKafkaSpoutConfig(port, 10_000);
    -    }
    -
    -    static public KafkaSpoutConfig<String, String> getKafkaSpoutConfig(int port, long offsetCommitPeriodMs) {
    -        return getKafkaSpoutConfig(port, offsetCommitPeriodMs, getRetryService());
    -    }
    -
    -    static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(int port, long offsetCommitPeriodMs, KafkaSpoutRetryService retryService) {
    +    static public KafkaSpoutConfig.Builder<String,String> getKafkaSpoutConfigBuilder(int port) {
    --- End diff --
    
    public static


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    +1. 
    
    @srdo I forgot to say in the last review. Can you please squash the commits. Once that is done as I am concerned, this patch is ready to merge. 
    
    @HeartSaVioR can you please do your last pass? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r107537729
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---
    @@ -172,19 +174,20 @@ public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval
         }
     
         @Override
    -    public Set<TopicPartition> retriableTopicPartitions() {
    -        final Set<TopicPartition> tps = new HashSet<>();
    +    public Map<TopicPartition, Long> earliestOffsetForEachRetriableTopicPartition() {
    --- End diff --
    
    Let's rename this method to `earliestRetriableOffsets`. The javadoc will provide the details. Let's not write very complex and long method names.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r106531182
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -268,12 +268,12 @@ private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class
             
             /**
              * The maximum number of records a poll will return.
    -         * Will only work with Kafka 0.10.0 and above.
    +         * This is limited by maxUncommittedOffsets, since it doesn't make sense to allow larger polls than the spout is allowed to emit.
    +         * Please note that when there are retriable tuples on a partition, maxPollRecords is an upper bound for how far the spout will read past the last committed offset on that partition.
    +         * It is recommended that users set maxUncommittedOffsets and maxPollRecords to be equal.
    --- End diff --
    
    @srdo this should not be the case. From our discussion, maxUncommittedOffsets should be much larger than maxPollRecords


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r104603153
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -329,11 +350,13 @@ private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
                         }
                     
                         emitted.add(msgId);
    -                    numUncommittedOffsets++;
                     
                         if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
                             retryService.remove(msgId);
    -                    }
    +                    } else {
    +                    //New tuple, increment the uncommitted offset counter
    --- End diff --
    
    nits: indentation seemed off here and below lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r107545223
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java ---
    @@ -54,10 +55,11 @@
         boolean retainAll(Collection<TopicPartition> topicPartitions);
     
         /**
    -     * @return set of topic partitions that have offsets that are ready to be retried, i.e.,
    -     * for which a tuple has failed and has retry time less than current time
    +     * @return The earliest retriable offset for each TopicPartition that has
    +     * offsets that are ready to be retried, i.e., for which a tuple has failed
    --- End diff --
    
    Will fix


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @HeartSaVioR @hmcl Do you guys have time to take another look? (again, very sorry about the number of times I've declared this "done")


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r107545301
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java ---
    @@ -57,10 +57,9 @@
         //Current tests require that numMessages >= 2*maxUncommittedOffsets
         private final int numMessages = 100;
         private final int maxUncommittedOffsets = 10;
    -    //This is set to be the same as maxUncommittedOffsets since it is difficult to test maxUncommittedOffsets when maxPollRecords is not the same
    -    //If maxPollRecords is larger, a single call to poll will emit more than maxUncommittedOffsets messages
    -    //If maxPollRecords is lower, it will cap how far past the commit offset the spout can read when there are failed tuples ready for retry
    -    private final int maxPollRecords = maxUncommittedOffsets;
    +    //Current tests require that maxPollRecords is less than maxUncommittedOffsets
    --- End diff --
    
    It is still true. I'll add an assertion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99638933
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -78,7 +78,7 @@
         private transient Map<TopicPartition, OffsetManager> acked;           // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate
         private transient Set<KafkaSpoutMessageId> emitted;                 // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed
         private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;         // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
    -    private transient long numUncommittedOffsets;                       // Number of offsets that have been polled and emitted but not yet been committed
    +    private transient int numUncommittedOffsets;                       // Number of offsets that have been polled and emitted but not yet been committed
    --- End diff --
    
    maxUncommittedOffsets is an int. This just makes it consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @hmcl I think this is ready for another look when you get a chance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r107542426
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---
    @@ -172,19 +174,20 @@ public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval
         }
     
         @Override
    -    public Set<TopicPartition> retriableTopicPartitions() {
    -        final Set<TopicPartition> tps = new HashSet<>();
    +    public Map<TopicPartition, Long> earliestOffsetForEachRetriableTopicPartition() {
    --- End diff --
    
    Will rename


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99640360
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -240,7 +240,9 @@ private boolean commit() {
     
         private boolean poll() {
             final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
    -        final boolean poll = !waitingToEmit() && numUncommittedOffsets < maxUncommittedOffsets;
    +        final boolean poll = !waitingToEmit() &&
    +            (numUncommittedOffsets < maxUncommittedOffsets ||
    +            !retryService.retriableTopicPartitions().isEmpty());
    --- End diff --
    
    `!retryService.retriableTopicPartitions().isEmpty()` can't this condition cause unbounded  polling (i.e. continue forever), if some specific tuples are always failing?
    
    The goal is to have a limit in how much can be polled (hence kept in memory) in case of recurring failures.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r104774243
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java ---
    @@ -0,0 +1,169 @@
    +/*
    --- End diff --
    
    I'll make a PR for 1.x soon


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99642616
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -289,6 +293,13 @@ private void doSeekRetriableTopicPartitions() {
                     kafkaConsumer.seek(rtp, acked.get(rtp).getCommittedOffset() + 1);    // Seek to last committed offset
                 }
             }
    +        
    +        if(!retriableTopicPartitions.isEmpty()) {
    +            //Pause other partitions for this poll so maxUncommittedPartitions isn't exceeded by too much
    --- End diff --
    
    No, I made a mistake there. maxSpoutPending is being enforced correctly. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99624959
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---
    @@ -27,13 +27,15 @@
     import java.util.Comparator;
     import java.util.HashSet;
     import java.util.Iterator;
    +import java.util.Objects;
     import java.util.Set;
     import java.util.TreeSet;
     import java.util.concurrent.TimeUnit;
    +import org.apache.storm.utils.Time;
     
     /**
      * Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows:
    - * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1)    where failCount = 1, 2, 3, ...
    + * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod*2^(failCount-1)    where failCount = 1, 2, 3, ...
      * nextRetry = Min(nextRetry, currentTime + maxDelay)
    --- End diff --
    
    The exponential backoff is a geometric progression with rate `delayPeriod`, hence has a very well known formula. Wy change the formula? Why the constant factor of 2 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99932405
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---
    @@ -329,11 +328,13 @@ private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class
                 this.offsetCommitPeriodMs = offsetCommitPeriodMs;
                 return this;
             }
    -        
    +
             /**
              * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
              * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
    -         * of pending offsets bellow the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
    +         * of pending offsets below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
    +         * Note that this limit can in some cases be exceeded, but no partition will exceed this limit by more than 2*maxPollRecords.
    --- End diff --
    
    Agree


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r106687191
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---
    @@ -54,21 +55,28 @@
         private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
             @Override
             public int compare(RetrySchedule entry1, RetrySchedule entry2) {
    -            return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
    +            int result = Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
    +            
    +            if(result == 0) {
    +                //TreeSet uses compareTo instead of equals() for the Set contract
    +                //Ensure that we can save two retry schedules with the same timestamp
    --- End diff --
    
    It's probably impossible in real life if System.nanoTime has sufficiently high resolution (not guaranteed to be any better than System.currentTimeMillis), but it can happen in tests that use simulated time. There's still potential for collisions if two retry schedules happen to have the same hashCode and timestamp, but that's very unlikely. I mostly made the change for the benefit of the tests, but it doesn't hurt to reduce the chance of collision in general.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @srdo what you mean is that if `numUncommittedOffsets  > max UncommittedOffsets` the Spout should still emit the retries, but not poll new records. Is that it ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r107542392
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---
    @@ -60,17 +61,19 @@ public int compare(RetrySchedule entry1, RetrySchedule entry2) {
                 if(result == 0) {
                     //TreeSet uses compareTo instead of equals() for the Set contract
                     //Ensure that we can save two retry schedules with the same timestamp
    -                result = entry1.hashCode() - entry2.hashCode();
    +                result = entry1.instanceNumber - entry2.instanceNumber;
    --- End diff --
    
    Will revert to using hashCode(). I'd like to at least keep that check, since it would be annoying in tests using simulated time if we had to add 1 nanosecond between every call to spout.fail().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    Sorry to keep changing this. I hope these are the last few changes. The spout should now seek to the lowest retriable offset for each partition when retrying tuples, instead of seeking back to the last committed offset. This means there's not as far as I can see any reason to worry about maxPollRecords being set too low compared to maxUncommittedOffsets, so I've reverted that part of KafkaSpoutConfig.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @srdo I am reading through your detailed explanation again and will let you know my opinion concerning the need, or no need, for the parameter `maxUncommittedOffsets`, based on all the facts you state. 
    
    Prior to that, however, I will share right away what is the rationale behind `maxUncommittedOffsets` and why I think we still need it. 
    
    `maxUncommittedOffsets` is unrelated to `ConsumerConfig#MAX_POLL_RECORDS_CONFIG` and to `Config#TOPOLOGY_MAX_SPOUT_PENDING`. It's purpose is to allow the `KafkaSpout` to keep polling/fetching records from Kafka while imposing a (memory consumption) limit on how many offsets can be **polled and non committed over ANY ARBITRARY NUMBER of polls to Kafka (not just one poll)**, before the KafkaSpout stops polling. 
    
    This limit is necessary because if e.g. tuple for offset **3** keeps failing, and all subsequent offsets **(4, 5, 6, ... 1M, ... 1B, ...)** are acked, the spout won't be able to commit to Kafka any offset greater than 3, unless it reaches **maxNumberOfRetrials**. If there is no limit on **maxNumberOfRetrials** (in order to guarantee at least once delivery), and offset 3 keeps failing forever, the spout won't ever be able to commit any offset > 3. In the limit this would cause the `Map<TopicPartition, OffsetManager> acked` to grow to infinite size (despite it's small footprint, which is constituted of mainly offsets and some bookkeeping small objects - it doesn't have `ConsumerRecord`s). 
    
    In order to put a cap on the size of this map, `maxUncommittedOffset` was created to stop polling if this number is too high, and avoid `OutOfMemoryError`. **The purpose of the KafkaSpout keeping on polling despite a particular offset (tuple) keeping on failing is to increase the overall throughput.** If at some point the offset 3 is successfully acked, the max subset of continuous offsets acked would be committed to Kafka in the next `Timer commitTimer` cycle, and the size of `Map<TopicPartition, OffsetManager> acked` would be reduced (potentially by a lot). The spout could then keep doing its work and the overall throughput would be much higher.
    
    `maxUncommittedOffsets` is not a hard limit parameter, and I don't think that we have to enforce that the KafkaSpout never exceeds this number. It is OK to exceed it as long as the JVM doesn't throw `OutOfMemoryError`. Currently, and prior to these proposed changes, in the worse case **maxUncommittedOffsets** is upper bounded by  **maxUncommittedOffsets + maxFetchRecords - 1**, which is OK as long as there is memory. 
    
    Perhaps the name `maxUncommittedOffsets` is not the most clear name. We can either change the name (which can cause background compatibility issues and/or confusion), or document properly what this parameter really means. The bottom line is that I think that we need a parameter doing the job that `maxUncommittedOffsets` is currently doing, which I don't think can be accomplished with a combination of`ConsumerConfig#MAX_POLL_RECORDS_CONFIG` and/or `Config#TOPOLOGY_MAX_SPOUT_PENDING`
    
    The name `maxUncommittedOffsets` should more precisely be something like `allowPollingIflLessThanMaxUncommittedOffsets`
    
    Please let me know your thoughts. I will still go over your explanation again in the meantime.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99634538
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---
    @@ -54,59 +56,92 @@
         private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
             @Override
             public int compare(RetrySchedule entry1, RetrySchedule entry2) {
    -            return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
    +            int result = Long.valueOf(entry1.nextRetryTimeMs()).compareTo(entry2.nextRetryTimeMs());
    +            
    +            if(result == 0) {
    +                //TreeSet uses compareTo instead of equals() for the Set contract
    +                //Ensure that we can save two retry schedules with the same timestamp
    +                result = entry1.hashCode() - entry2.hashCode();
    +            }
    +            return result;
             }
         }
     
         private class RetrySchedule {
             private final KafkaSpoutMessageId msgId;
    -        private long nextRetryTimeNanos;
    +        private long nextRetryTimeMs;
     
             public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) {
                 this.msgId = msgId;
    -            this.nextRetryTimeNanos = nextRetryTime;
    +            this.nextRetryTimeMs = nextRetryTime;
                 LOG.debug("Created {}", this);
             }
     
             public void setNextRetryTime() {
    -            nextRetryTimeNanos = nextTime(msgId);
    +            nextRetryTimeMs = nextTime(msgId);
                 LOG.debug("Updated {}", this);
             }
     
    -        public boolean retry(long currentTimeNanos) {
    -            return nextRetryTimeNanos <= currentTimeNanos;
    +        public boolean retry(long currentTimeMs) {
    +            return nextRetryTimeMs <= currentTimeMs;
             }
     
             @Override
             public String toString() {
                 return "RetrySchedule{" +
                         "msgId=" + msgId +
    -                    ", nextRetryTime=" + nextRetryTimeNanos +
    +                    ", nextRetryTimeMs=" + nextRetryTimeMs +
                         '}';
             }
     
    +        @Override
    +        public int hashCode() {
    +            int hash = 5;
    +            hash = 29 * hash + Objects.hashCode(this.msgId);
    +            return hash;
    +        }
    +
    +        @Override
    +        public boolean equals(Object obj) {
    +            if (this == obj) {
    +                return true;
    +            }
    +            if (obj == null) {
    +                return false;
    +            }
    +            if (getClass() != obj.getClass()) {
    +                return false;
    +            }
    +            final RetrySchedule other = (RetrySchedule) obj;
    +            if (!Objects.equals(this.msgId, other.msgId)) {
    +                return false;
    +            }
    +            return true;
    +        }
    +
             public KafkaSpoutMessageId msgId() {
                 return msgId;
             }
     
    -        public long nextRetryTimeNanos() {
    -            return nextRetryTimeNanos;
    +        public long nextRetryTimeMs() {
    +            return nextRetryTimeMs;
             }
         }
     
         public static class TimeInterval implements Serializable {
    -        private final long lengthNanos;
    -        private final long length;
    -        private final TimeUnit timeUnit;
    +        private final long lengthMs;
     
             /**
              * @param length length of the time interval in the units specified by {@link TimeUnit}
    -         * @param timeUnit unit used to specify a time interval on which to specify a time unit
    +         * @param timeUnit unit used to specify a time interval on which to specify a time unit. Smallest supported unit is milliseconds
              */
             public TimeInterval(long length, TimeUnit timeUnit) {
    -            this.length = length;
    -            this.timeUnit = timeUnit;
    -            this.lengthNanos = timeUnit.toNanos(length);
    +            
    +            if(timeUnit == TimeUnit.MICROSECONDS || timeUnit == TimeUnit.NANOSECONDS) {
    +                throw new IllegalArgumentException("TimeInterval does not support time units smaller than milliseconds");
    +            }
    --- End diff --
    
    Why?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    I think it would be good to get some of the assumptions I've made down in writing, so I just wrote down a more thorough explanation of the issues I think this PR should fix, and what changes I think it should make:
    
    The intention of this PR is to fix an issue where the spout has emitted maxUncommittedOffsets (or more) tuples, and some of them fail, and the spout then hangs because the `numUncommittedOffsets < maxUncommittedOffsets` check when deciding whether to poll doesn't account for retriable tuples. Blindly polling when there are retriable tuples will fix this issue, but leads to us not being able to bound the number of offsets the consumer has read past the last committed offset.
    
    For an example of a case where we can't put a bound on uncommitted offsets, say that partition 0 has maxUncommittedOffsets + maxPollRecords emitted tuples past the commit offset, and partition 1 has no emitted tuples (or any number below maxUncommittedOffsets, it doesn't matter).
    
    When tuples fail and become retriable on partition 0, the spout would blindly poll for more tuples. If it gets tuples from partition 0 it's okay, since it seeks back to the committed offset for that partition, so it won't go beyond committedOffset + maxPollRecords (since the retry logic seeks back to the committed offset on every poll, the spout can't go more than maxPollRecords beyond that offset). If it gets tuples from partition 1 we're effectively just emitting new tuples while ignoring maxUncommittedOffsets. Since we aren't controlling which partitions the polled tuples may come from, we can't say anything meaningful about a cap on uncommitted offsets. While I think it would probably work out to being capped anyway, due to the consumer consuming partitions round robin (AFAIK), I'd prefer if the spout implementation doesn't make assumptions beyond those guaranteed by the consumer API (in which case we should assume any call to `KafkaConsumer.poll` could return messages for an
 y assigned non-paused partition).
    
    So the way to fix that issue is to ensure that if we're polling for retriable tuples, we only poll on those partitions that have retriable tuples (i.e. we pause the others when doing `doSeekRetriableTopicPartitions`, then resume all assigned after calling `KafkaConsumer.poll`). Pausing the other partitions should only affect that poll cycle, since once retriable tuples get emitted they're no longer retriable, and we won't hit `doSeekRetriableTopicPartitions` for those tuples again right away. In most polls (no failed tuples, or tuples are failed but not ready for retry), we won't hit the pausing case.
    
    If we pause partitions with no retriable tuples when polling on partitions with retriable tuples, we should be able to guarantee that any partition never gets more than maxUncommittedOffsets + maxPollRecords - 1 past the last committed offset. In the case where there are no failed tuples, we can reach the limit by having maxUncommittedOffsets - 1 emitted offsets, and polling once, getting up to maxPollRecords more. If there are retriable tuples, pausing will stop us from ignoring the maxUncommittedOffsets cap for partitions with no retriable tuples, and the partitions with retriable tuples won't get more than maxPollRecords beyond the last committed offset, since the consumer seeks back to that offset when polling for retriable offsets.
    
    There's a second minor issue I'd like this PR to address:
    If maxPollRecords isn't exactly equal to maxUncommittedOffsets, the spout can behave in some undesirable ways.
    * If maxPollRecords is greater than maxUncommittedOffsets, the maxUncommittedOffsets limit may be exceeded on any one poll. In this case there's no reason to have 2 separate variables, since the net effect is the same as setting maxUncommittedOffsets to be equal to maxPollRecords. 
    * If maxPollRecords is less than maxUncommittedOffsets, there's a risk of the spout getting stuck on some tuples for a while when it is retrying tuples.
      Say there are 10 retriable tuples following the last committed offset, and maxUncommittedOffsets is 10. If maxPollRecords is 5 and the first 5 retriable tuples are reemitted in the first batch, the next 5 tuples can't be emitted until (some of) the first 5 are acked. This is because the spout will seek the consumer back to the last committed offset any time there are failed tuples, which will lead to it getting the first 5 tuples out of the consumer, checking that they are emitted, and skipping them. This will repeat until the last committed offset moves. If there are other partitions with tuples available, those tuples may get emitted, but the "blocked" partition won't progress until some tuples are acked on it.
      
    I think it might make sense to remove maxUncommittedOffsets entirely, and have the spout use maxPollRecords instead?
    
    The tl;dr is that I think we should allow polling if there are retriable offsets, even if `numUncommittedOffsets >= maxUncommittedOffsets`, we should pause non-retriable partitions when polling for retries, and we should maybe merge maxUncommittedOffsets and maxPollRecords.
    
    @hmcl Could you take a look at this explanation and let me know if I got something wrong, or if you'd prefer to solve the issues in another way? I think it got a bit hypothetical.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99634404
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---
    @@ -54,59 +56,92 @@
         private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
             @Override
             public int compare(RetrySchedule entry1, RetrySchedule entry2) {
    -            return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
    +            int result = Long.valueOf(entry1.nextRetryTimeMs()).compareTo(entry2.nextRetryTimeMs());
    +            
    +            if(result == 0) {
    +                //TreeSet uses compareTo instead of equals() for the Set contract
    +                //Ensure that we can save two retry schedules with the same timestamp
    +                result = entry1.hashCode() - entry2.hashCode();
    +            }
    +            return result;
             }
         }
     
         private class RetrySchedule {
             private final KafkaSpoutMessageId msgId;
    -        private long nextRetryTimeNanos;
    +        private long nextRetryTimeMs;
     
             public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) {
                 this.msgId = msgId;
    -            this.nextRetryTimeNanos = nextRetryTime;
    +            this.nextRetryTimeMs = nextRetryTime;
                 LOG.debug("Created {}", this);
             }
     
             public void setNextRetryTime() {
    -            nextRetryTimeNanos = nextTime(msgId);
    +            nextRetryTimeMs = nextTime(msgId);
                 LOG.debug("Updated {}", this);
             }
     
    -        public boolean retry(long currentTimeNanos) {
    -            return nextRetryTimeNanos <= currentTimeNanos;
    +        public boolean retry(long currentTimeMs) {
    +            return nextRetryTimeMs <= currentTimeMs;
             }
     
             @Override
             public String toString() {
                 return "RetrySchedule{" +
                         "msgId=" + msgId +
    -                    ", nextRetryTime=" + nextRetryTimeNanos +
    +                    ", nextRetryTimeMs=" + nextRetryTimeMs +
                         '}';
             }
     
    +        @Override
    +        public int hashCode() {
    --- End diff --
    
    why did you decide to implement  equals and hashcode ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99881564
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java ---
    @@ -0,0 +1,156 @@
    +/*
    + * Copyright 2017 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.junit.Assert.assertThat;
    +import static org.mockito.Matchers.anyCollection;
    +import static org.mockito.Matchers.anyLong;
    +import static org.mockito.Matchers.anyObject;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.reset;
    +import static org.mockito.Mockito.times;
    +import static org.mockito.Mockito.verify;
    +import static org.mockito.Mockito.when;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.ConsumerRecord;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.junit.Before;
    +import org.junit.Test;
    +import org.mockito.ArgumentCaptor;
    +
    +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
    +
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Time.SimulatedTime;
    +
    +public class KafkaSpoutEmitTest {
    +
    +    private final long offsetCommitPeriodMs = 2_000;
    +    private final TopologyContext contextMock = mock(TopologyContext.class);
    +    private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
    +    private final Map<String, Object> conf = new HashMap<>();
    +    private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
    +    private KafkaConsumer<String, String> consumerMock;
    +    private KafkaSpout<String, String> spout;
    +    private KafkaSpoutConfig spoutConfig;
    +
    +    private void setupSpout() {
    +        spoutConfig = getKafkaSpoutConfigBuilder(-1)
    +            .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
    +            .build();
    +
    +        consumerMock = mock(KafkaConsumer.class);
    +        KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
    +
    +        //Set up a spout listening to 1 topic partition
    +        spout = new KafkaSpout<>(spoutConfig)
    +            .setKafkaConsumerFactory(consumerFactory);
    +
    +        spout.open(conf, contextMock, collectorMock);
    +        spout.activate();
    +
    +        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
    +        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
    +
    +        //Assign partitions to the spout
    +        ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
    +        consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(partition));
    +    }
    +
    +    @Test
    +    public void shouldEmitAtMostOneMessagePerNextTupleCall() {
    --- End diff --
    
    I suggest for method name `testNextTuple_emitAtMostOneTuple`.  Similar to other methods, but it's up to you. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r104611695
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java ---
    @@ -0,0 +1,245 @@
    +/*
    + * Copyright 2017 The Apache Software Foundation.
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.kafka.spout;
    +
    +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
    +import static org.hamcrest.CoreMatchers.is;
    +import static org.junit.Assert.assertThat;
    +import static org.mockito.Matchers.anyObject;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.reset;
    +import static org.mockito.Mockito.spy;
    +import static org.mockito.Mockito.times;
    +import static org.mockito.Mockito.verify;
    +
    +import info.batey.kafka.unit.KafkaUnitRule;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.stream.Collectors;
    +import java.util.stream.IntStream;
    +import kafka.producer.KeyedMessage;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
    +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.utils.Time;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.mockito.ArgumentCaptor;
    +import org.mockito.MockitoAnnotations;
    +
    +public class MaxUncommittedOffsetTest {
    +
    +    @Rule
    +    public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
    +
    +    private final TopologyContext topologyContext = mock(TopologyContext.class);
    +    private final Map<String, Object> conf = new HashMap<>();
    +    private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
    +    private final long commitOffsetPeriodMs = 2_000;
    +    private final int numMessages = 100;
    +    private final int maxUncommittedOffsets = 10;
    +    //This is set to be the same as maxUncommittedOffsets since it is difficult to test maxUncommittedOffsets when maxPollRecords is not the same
    +    //If maxPollRecords is larger, a single call to poll will emit more than maxUncommittedOffsets messages
    +    //If maxPollRecords is lower, it will cap how far past the commit offset the spout can read when there are failed tuples ready for retry
    +    private final int maxPollRecords = maxUncommittedOffsets;
    +    private final int initialRetryDelaySecs = 60;
    +    private final KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaPort())
    +        .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
    +        .setMaxPollRecords(maxPollRecords)
    +        .setMaxUncommittedOffsets(maxUncommittedOffsets)
    +        .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
    +            1, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(initialRetryDelaySecs))) //Retry once after a minute
    +        .build();
    +    private KafkaConsumer<String, String> consumerSpy;
    +    private KafkaConsumerFactory<String, String> consumerFactory;
    +    private KafkaSpout<String, String> spout;
    +
    +    @Before
    +    public void setUp() {
    +        MockitoAnnotations.initMocks(this);
    +        this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
    +        this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy;
    +        this.spout = new KafkaSpout<>(spoutConfig, consumerFactory);
    +    }
    +
    +    private void populateTopicData(String topicName, int msgCount) {
    +        kafkaUnitRule.getKafkaUnit().createTopic(topicName);
    +
    +        IntStream.range(0, msgCount).forEach(value -> {
    +            KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(
    +                topicName, Integer.toString(value),
    +                Integer.toString(value));
    +
    +            kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
    +        });
    +    }
    +
    +    private void initializeSpout(int msgCount) {
    +        populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
    +        spout.open(conf, topologyContext, collector);
    +        spout.activate();
    +    }
    +
    +    public ArgumentCaptor<KafkaSpoutMessageId> emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(int messageCount) {
    +        //The spout must respect maxUncommittedOffsets when requesting/emitting tuples
    +        initializeSpout(messageCount);
    +
    +        //Try to emit all messages. Ensure only maxUncommittedOffsets are emitted
    +        ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
    +        for (int i = 0; i < messageCount; i++) {
    +            spout.nextTuple();
    +        };
    +        verify(collector, times(maxUncommittedOffsets)).emit(
    +            anyObject(),
    +            anyObject(),
    +            messageIds.capture());
    +        return messageIds;
    +    }
    +
    +    @Test
    +    public void testNextTupleCanEmitMoreMessagesWhenDroppingBelowMaxUncommittedOffsetsDueToCommit() {
    +        //The spout must respect maxUncommittedOffsets after committing a set of records
    +        try (Time.SimulatedTime simulatedTime = new Time.SimulatedTime()) {
    +            //First check that maxUncommittedOffsets is respected when emitting from scratch
    +            ArgumentCaptor<KafkaSpoutMessageId> messageIds = emitMaxUncommittedOffsetsMessagesAndCheckNoMoreAreEmitted(numMessages);
    --- End diff --
    
    Minor: Precondition here is that `numMessages >= 2 * (maxUncommittedOffsets)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r107532327
  
    --- Diff: external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java ---
    @@ -57,10 +57,9 @@
         //Current tests require that numMessages >= 2*maxUncommittedOffsets
         private final int numMessages = 100;
         private final int maxUncommittedOffsets = 10;
    -    //This is set to be the same as maxUncommittedOffsets since it is difficult to test maxUncommittedOffsets when maxPollRecords is not the same
    -    //If maxPollRecords is larger, a single call to poll will emit more than maxUncommittedOffsets messages
    -    //If maxPollRecords is lower, it will cap how far past the commit offset the spout can read when there are failed tuples ready for retry
    -    private final int maxPollRecords = maxUncommittedOffsets;
    +    //Current tests require that maxPollRecords is less than maxUncommittedOffsets
    --- End diff --
    
    Is this still true? If so, let's put an assertion validating this in the @Before method or something.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99639068
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -78,7 +78,7 @@
         private transient Map<TopicPartition, OffsetManager> acked;           // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate
         private transient Set<KafkaSpoutMessageId> emitted;                 // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed
         private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;         // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
    -    private transient long numUncommittedOffsets;                       // Number of offsets that have been polled and emitted but not yet been committed
    +    private transient int numUncommittedOffsets;                       // Number of offsets that have been polled and emitted but not yet been committed
    --- End diff --
    
    Actually you're right, since the spout may emit more than maxUncommittedOffsets this should be changed back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    Thanks for the reviews guys.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99869121
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---
    @@ -27,13 +27,15 @@
     import java.util.Comparator;
     import java.util.HashSet;
     import java.util.Iterator;
    +import java.util.Objects;
     import java.util.Set;
     import java.util.TreeSet;
     import java.util.concurrent.TimeUnit;
    +import org.apache.storm.utils.Time;
     
     /**
      * Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows:
    - * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1)    where failCount = 1, 2, 3, ...
    + * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod*2^(failCount-1)    where failCount = 1, 2, 3, ...
      * nextRetry = Min(nextRetry, currentTime + maxDelay)
    --- End diff --
    
    OK.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1924#discussion_r99635403
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---
    @@ -78,7 +78,7 @@
         private transient Map<TopicPartition, OffsetManager> acked;           // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate
         private transient Set<KafkaSpoutMessageId> emitted;                 // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed
         private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;         // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
    -    private transient long numUncommittedOffsets;                       // Number of offsets that have been polled and emitted but not yet been committed
    +    private transient int numUncommittedOffsets;                       // Number of offsets that have been polled and emitted but not yet been committed
    --- End diff --
    
    Why impose a more restricted data type? What if the user desires a large number?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1924: STORM-2343: New Kafka spout can stop emitting tupl...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1924


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1924: STORM-2343: New Kafka spout can stop emitting tuples if m...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on the issue:

    https://github.com/apache/storm/pull/1924
  
    @HeartSaVioR @srdo reviewing this right now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---