You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by vivekmittal <gi...@git.apache.org> on 2017/05/08 05:18:48 UTC

[GitHub] storm pull request #2104: [STORM-2505] Spout to support topic compaction

GitHub user vivekmittal opened a pull request:

    https://github.com/apache/storm/pull/2104

    [STORM-2505] Spout to support topic compaction

    Kafka maintains the spout progress (offsets for partitions) which can hold a value which no longer exists (or offset+1 doesn't exist) in the topic due to following reasons
    Topology stopped processing (or died) & topic got compacted (cleanup.policy=compact) leaving offset voids in the topic.
    
    Topology stopped processing (or died) & Topic got cleaned up (cleanup.policy=delete) and the offset.
    When the topology starts processing again (or restarted), the spout logic suggests that the next offset has to be (committedOffset+1) for the spout to make progress, which will never be the case as (committedOffset+1) has been removed from the topic and will never be acked.
    
    **OffsetManager.java**
    `
     if (currOffset == nextCommitOffset + 1) {            // found the next offset to commit
                    found = true;
                    nextCommitMsg = currAckedMsg;
                    nextCommitOffset = currOffset;
                } else if (currOffset > nextCommitOffset + 1) {
                    LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
                }
    `
    
    This pull request maintains a emitted offset set in OffsetManager & smartly forward the offset to the next logical offset

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vivekmittal/storm master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2104.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2104
    
----
commit acfd13b9c71f6cf4d7307aa1598e8b50b06b0f37
Author: Vivek Mittal <vi...@flipkart.com>
Date:   2017-05-08T05:13:36Z

    [STORM-2505] Spout to support topic compaction. Maintaining a emitted set in OffsetManager to handle the voids in the topic

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2104: [STORM-2505] Spout to support topic compaction

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2104#discussion_r115412857
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---
    @@ -68,13 +74,34 @@ public OffsetAndMetadata findNextCommitOffset() {
             KafkaSpoutMessageId nextCommitMsg = null;     // this is a convenience variable to make it faster to create OffsetAndMetadata
     
             for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is that of a linear scan on a TreeMap
    -            if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) {            // found the next offset to commit
    +            currOffset = currAckedMsg.offset();
    +            if (currOffset == nextCommitOffset + 1) {            // found the next offset to commit
                     found = true;
                     nextCommitMsg = currAckedMsg;
                     nextCommitOffset = currOffset;
    -            } else if (currAckedMsg.offset() > nextCommitOffset + 1) {    // offset found is not continuous to the offsets listed to go in the next commit, so stop search
    -                LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
    -                break;
    +            } else if (currOffset > nextCommitOffset + 1) {
    +                if (emittedOffsets.contains(nextCommitOffset + 1)) {
    +                    LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
    +                    break;
    +                } else {
    +                    /*
    +                        This case will arise in case of non contiguous offset being processed.
    +                        So, if the queue doesn't contain offset = committedOffset + 1 (possible
    +                        if the queue is compacted or deleted), the consumer should jump to
    --- End diff --
    
    Minor nit: "queue" => "topic"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2104: [STORM-2505] Spout to support topic compaction

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2104#discussion_r115317728
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---
    @@ -113,6 +140,15 @@ public long commit(OffsetAndMetadata committedOffset) {
                     break;
                 }
             }
    +
    +        for (Iterator<Long> iterator = emittedOffsets.iterator(); iterator.hasNext();) {
    --- End diff --
    
    which scenario is this cleanup code accounting for ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2104: [STORM-2505] Spout to support topic compaction

Posted by vivekmittal <gi...@git.apache.org>.
Github user vivekmittal commented on the issue:

    https://github.com/apache/storm/pull/2104
  
    Squashed the commits into one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2104: [STORM-2505] Spout to support topic compaction

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/2104
  
    @vivekmittal over LGTM. I am +1 once the method name is addressed.
    Thanks for finding & addressing the bug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2104: [STORM-2505] Spout to support topic compaction

Posted by vivekmittal <gi...@git.apache.org>.
Github user vivekmittal commented on the issue:

    https://github.com/apache/storm/pull/2104
  
    Raised PR to pull same changes in 1.x-branch 
    https://github.com/apache/storm/pull/2107


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2104: [STORM-2505] Spout to support topic compaction

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2104#discussion_r115320154
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---
    @@ -68,13 +74,34 @@ public OffsetAndMetadata findNextCommitOffset() {
             KafkaSpoutMessageId nextCommitMsg = null;     // this is a convenience variable to make it faster to create OffsetAndMetadata
     
             for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is that of a linear scan on a TreeMap
    -            if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) {            // found the next offset to commit
    +            currOffset = currAckedMsg.offset();
    +            if (currOffset == nextCommitOffset + 1) {            // found the next offset to commit
                     found = true;
                     nextCommitMsg = currAckedMsg;
                     nextCommitOffset = currOffset;
    -            } else if (currAckedMsg.offset() > nextCommitOffset + 1) {    // offset found is not continuous to the offsets listed to go in the next commit, so stop search
    -                LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
    -                break;
    +            } else if (currOffset > nextCommitOffset + 1) {
    +                if (emittedOffsets.contains(nextCommitOffset + 1)) {
    +                    LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
    +                    break;
    +                } else {
    +                    /*
    +                        This case will arise in case of non contiguous offset being processed.
    +                        So, if the queue doesn't contain offset = committedOffset + 1 (possible
    +                        if the queue is compacted or deleted), the consumer should jump to
    +                        the next logical point in the queue. Next logical offset should be the
    +                        first element after committedOffset in the ascending ordered emitted set.
    +                     */
    +                    LOG.debug("Processed non contiguous offset, the previously committed offset has been deleted from the topic. Committed: [{}], Processed: [{}]", committedOffset, currOffset);
    +                    final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset);
    --- End diff --
    
    Nice catch, L96 seems like it'll throw an NPE when Java tries to unbox nextEmittedOffset


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2104: [STORM-2505] Spout to support topic compaction

Posted by vivekmittal <gi...@git.apache.org>.
Github user vivekmittal commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2104#discussion_r115406125
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---
    @@ -68,13 +74,34 @@ public OffsetAndMetadata findNextCommitOffset() {
             KafkaSpoutMessageId nextCommitMsg = null;     // this is a convenience variable to make it faster to create OffsetAndMetadata
     
             for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is that of a linear scan on a TreeMap
    -            if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) {            // found the next offset to commit
    +            currOffset = currAckedMsg.offset();
    +            if (currOffset == nextCommitOffset + 1) {            // found the next offset to commit
                     found = true;
                     nextCommitMsg = currAckedMsg;
                     nextCommitOffset = currOffset;
    -            } else if (currAckedMsg.offset() > nextCommitOffset + 1) {    // offset found is not continuous to the offsets listed to go in the next commit, so stop search
    -                LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
    -                break;
    +            } else if (currOffset > nextCommitOffset + 1) {
    +                if (emittedOffsets.contains(nextCommitOffset + 1)) {
    +                    LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
    +                    break;
    +                } else {
    +                    /*
    +                        This case will arise in case of non contiguous offset being processed.
    +                        So, if the queue doesn't contain offset = committedOffset + 1 (possible
    +                        if the queue is compacted or deleted), the consumer should jump to
    +                        the next logical point in the queue. Next logical offset should be the
    +                        first element after committedOffset in the ascending ordered emitted set.
    +                     */
    +                    LOG.debug("Processed non contiguous offset, the previously committed offset has been deleted from the topic. Committed: [{}], Processed: [{}]", committedOffset, currOffset);
    --- End diff --
    
    Yes the offset is there because it was part of the topic, but there is no guarantee that it will always be there. The offsets might be wiped off owing to compaction/deletion process in a kafka topic. This case will only arrive when the previously (committedOffset + 1) is no longer present in the topic. I will rephrase the message to explicitly specify (committedOffset+1) is no longer part of the topic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2104: [STORM-2505] Spout to support topic compaction

Posted by vivekmittal <gi...@git.apache.org>.
Github user vivekmittal commented on the issue:

    https://github.com/apache/storm/pull/2104
  
    @harshach Method name is addressed. Squashed all my commits into one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2104: [STORM-2505] Spout to support topic compaction

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2104
  
    LGTM. Good solution :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2104: [STORM-2505] Spout to support topic compaction

Posted by vivekmittal <gi...@git.apache.org>.
Github user vivekmittal commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2104#discussion_r115646942
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---
    @@ -49,10 +51,14 @@ public OffsetManager(TopicPartition tp, long initialFetchOffset) {
             LOG.debug("Instantiated {}", this);
         }
     
    -    public void add(KafkaSpoutMessageId msgId) {          // O(Log N)
    +    public void ack(KafkaSpoutMessageId msgId) {          // O(Log N)
    --- End diff --
    
    Make sense. I will change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2104: [STORM-2505] Spout to support topic compaction

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2104#discussion_r115305869
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---
    @@ -68,13 +74,34 @@ public OffsetAndMetadata findNextCommitOffset() {
             KafkaSpoutMessageId nextCommitMsg = null;     // this is a convenience variable to make it faster to create OffsetAndMetadata
     
             for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is that of a linear scan on a TreeMap
    -            if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) {            // found the next offset to commit
    +            currOffset = currAckedMsg.offset();
    --- End diff --
    
    why this change when the code is exactly the same ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2104: [STORM-2505] Spout to support topic compaction

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2104


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2104: [STORM-2505] Spout to support topic compaction

Posted by askprasanna <gi...@git.apache.org>.
Github user askprasanna commented on the issue:

    https://github.com/apache/storm/pull/2104
  
    Want to add the following note. This issue could potentially happen with non-compacted topics also. We had multiple topologies getting stuck due to this issue including a topic with "delete" config. Based on the learnings from the debugging myself and @vivekmittal did, even "delete" topics could encounter this issue under the following circumstance
    
    - A Kafka topic with relatively short retention period
    - A Storm topology that consumes from such a topic with a slow processing rate
    - A fallback strategy to earliest offset OR uncommitted_earliest
    
    Given the above, it is possible that while the spout/consumer is busy processing messages fetched from a particular partition, the async cleaner has run and cleaned up expired logs on the Kafka broker managing that partition. Now when the spout fetches the next batch during a subsequent poll it is likely to see message offsets that are not sequential to the ones it received in the previous batch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2104: [STORM-2505] Spout to support topic compaction

Posted by vivekmittal <gi...@git.apache.org>.
Github user vivekmittal commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2104#discussion_r115405683
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---
    @@ -68,13 +74,34 @@ public OffsetAndMetadata findNextCommitOffset() {
             KafkaSpoutMessageId nextCommitMsg = null;     // this is a convenience variable to make it faster to create OffsetAndMetadata
     
             for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is that of a linear scan on a TreeMap
    -            if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) {            // found the next offset to commit
    +            currOffset = currAckedMsg.offset();
    --- End diff --
    
    Using the currOffset at multiple places, so just extracted it in a variable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2104: [STORM-2505] Spout to support topic compaction

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/2104
  
    +1. Thanks @vivekmittal 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2104: [STORM-2505] Spout to support topic compaction

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2104#discussion_r115305431
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---
    @@ -49,10 +51,14 @@ public OffsetManager(TopicPartition tp, long initialFetchOffset) {
             LOG.debug("Instantiated {}", this);
         }
     
    -    public void add(KafkaSpoutMessageId msgId) {          // O(Log N)
    +    public void ack(KafkaSpoutMessageId msgId) {          // O(Log N)
    --- End diff --
    
    why rename this method? There is no acking happening in here. There is a message being added to the internal state of this object, which is reflected by the name **add**. The name **ack** is misleading.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2104: [STORM-2505] Spout to support topic compaction

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/2104
  
    @vivekmittal can you squash your commits into singe one.
    "Topology stopped processing (or died) & topic got compacted (cleanup.policy=compact) leaving offset voids in the topic.
    
    Topology stopped processing (or died) & Topic got cleaned up (cleanup.policy=delete) and the offset."
    
    In both of these cases are we not getting OffsetOutofRange exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2104: [STORM-2505] Spout to support topic compaction

Posted by vivekmittal <gi...@git.apache.org>.
Github user vivekmittal commented on the issue:

    https://github.com/apache/storm/pull/2104
  
    @harshach Will do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2104: [STORM-2505] Spout to support topic compaction

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2104#discussion_r115310456
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---
    @@ -68,13 +74,34 @@ public OffsetAndMetadata findNextCommitOffset() {
             KafkaSpoutMessageId nextCommitMsg = null;     // this is a convenience variable to make it faster to create OffsetAndMetadata
     
             for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is that of a linear scan on a TreeMap
    -            if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) {            // found the next offset to commit
    +            currOffset = currAckedMsg.offset();
    +            if (currOffset == nextCommitOffset + 1) {            // found the next offset to commit
                     found = true;
                     nextCommitMsg = currAckedMsg;
                     nextCommitOffset = currOffset;
    -            } else if (currAckedMsg.offset() > nextCommitOffset + 1) {    // offset found is not continuous to the offsets listed to go in the next commit, so stop search
    -                LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
    -                break;
    +            } else if (currOffset > nextCommitOffset + 1) {
    +                if (emittedOffsets.contains(nextCommitOffset + 1)) {
    +                    LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
    +                    break;
    +                } else {
    +                    /*
    +                        This case will arise in case of non contiguous offset being processed.
    +                        So, if the queue doesn't contain offset = committedOffset + 1 (possible
    +                        if the queue is compacted or deleted), the consumer should jump to
    +                        the next logical point in the queue. Next logical offset should be the
    +                        first element after committedOffset in the ascending ordered emitted set.
    +                     */
    +                    LOG.debug("Processed non contiguous offset, the previously committed offset has been deleted from the topic. Committed: [{}], Processed: [{}]", committedOffset, currOffset);
    +                    final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset);
    --- End diff --
    
    ceiling method can potentially return null. Is the current logic handling that possible case as well ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2104: [STORM-2505] Spout to support topic compaction

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2104#discussion_r115320142
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---
    @@ -113,6 +140,15 @@ public long commit(OffsetAndMetadata committedOffset) {
                     break;
                 }
             }
    +
    +        for (Iterator<Long> iterator = emittedOffsets.iterator(); iterator.hasNext();) {
    --- End diff --
    
    The emitted offsets have to get deleted at some point, or the set grows indefinitely


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2104: [STORM-2505] Spout to support topic compaction

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2104#discussion_r115320329
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---
    @@ -49,10 +51,14 @@ public OffsetManager(TopicPartition tp, long initialFetchOffset) {
             LOG.debug("Instantiated {}", this);
         }
     
    -    public void add(KafkaSpoutMessageId msgId) {          // O(Log N)
    +    public void ack(KafkaSpoutMessageId msgId) {          // O(Log N)
    --- End diff --
    
    Isn't add more ambiguous now that there are two different methods for adding offsets (emit())?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2104: [STORM-2505] Spout to support topic compaction

Posted by vivekmittal <gi...@git.apache.org>.
Github user vivekmittal commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2104#discussion_r115406854
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---
    @@ -68,13 +74,34 @@ public OffsetAndMetadata findNextCommitOffset() {
             KafkaSpoutMessageId nextCommitMsg = null;     // this is a convenience variable to make it faster to create OffsetAndMetadata
     
             for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is that of a linear scan on a TreeMap
    -            if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) {            // found the next offset to commit
    +            currOffset = currAckedMsg.offset();
    +            if (currOffset == nextCommitOffset + 1) {            // found the next offset to commit
                     found = true;
                     nextCommitMsg = currAckedMsg;
                     nextCommitOffset = currOffset;
    -            } else if (currAckedMsg.offset() > nextCommitOffset + 1) {    // offset found is not continuous to the offsets listed to go in the next commit, so stop search
    -                LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
    -                break;
    +            } else if (currOffset > nextCommitOffset + 1) {
    +                if (emittedOffsets.contains(nextCommitOffset + 1)) {
    +                    LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
    +                    break;
    +                } else {
    +                    /*
    +                        This case will arise in case of non contiguous offset being processed.
    +                        So, if the queue doesn't contain offset = committedOffset + 1 (possible
    +                        if the queue is compacted or deleted), the consumer should jump to
    +                        the next logical point in the queue. Next logical offset should be the
    +                        first element after committedOffset in the ascending ordered emitted set.
    +                     */
    +                    LOG.debug("Processed non contiguous offset, the previously committed offset has been deleted from the topic. Committed: [{}], Processed: [{}]", committedOffset, currOffset);
    +                    final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset);
    --- End diff --
    
    Will fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2104: [STORM-2505] Spout to support topic compaction

Posted by askprasanna <gi...@git.apache.org>.
Github user askprasanna commented on the issue:

    https://github.com/apache/storm/pull/2104
  
    We have set the auto.offset.reset property to 'latest' which is the default value in Kafka. The exception is thrown only when it is set to 'none'. Verified the same in Fetcher.java in Kafka clients library.
    
    We do see kafka client log message indicating offset being reset.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2104: [STORM-2505] Spout to support topic compaction

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2104#discussion_r115316343
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---
    @@ -68,13 +74,34 @@ public OffsetAndMetadata findNextCommitOffset() {
             KafkaSpoutMessageId nextCommitMsg = null;     // this is a convenience variable to make it faster to create OffsetAndMetadata
     
             for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is that of a linear scan on a TreeMap
    -            if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) {            // found the next offset to commit
    +            currOffset = currAckedMsg.offset();
    +            if (currOffset == nextCommitOffset + 1) {            // found the next offset to commit
                     found = true;
                     nextCommitMsg = currAckedMsg;
                     nextCommitOffset = currOffset;
    -            } else if (currAckedMsg.offset() > nextCommitOffset + 1) {    // offset found is not continuous to the offsets listed to go in the next commit, so stop search
    -                LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
    -                break;
    +            } else if (currOffset > nextCommitOffset + 1) {
    +                if (emittedOffsets.contains(nextCommitOffset + 1)) {
    +                    LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
    +                    break;
    +                } else {
    +                    /*
    +                        This case will arise in case of non contiguous offset being processed.
    +                        So, if the queue doesn't contain offset = committedOffset + 1 (possible
    +                        if the queue is compacted or deleted), the consumer should jump to
    +                        the next logical point in the queue. Next logical offset should be the
    +                        first element after committedOffset in the ascending ordered emitted set.
    +                     */
    +                    LOG.debug("Processed non contiguous offset, the previously committed offset has been deleted from the topic. Committed: [{}], Processed: [{}]", committedOffset, currOffset);
    --- End diff --
    
    "the previously committed offset has been deleted from the topic" - isn't this incorrect? If the offset has been committed, it has to be in the topic. I believe you meant to say something else in this message. Can you please rephrase it. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2104: [STORM-2505] Spout to support topic compaction

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2104#discussion_r115551459
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---
    @@ -49,10 +51,14 @@ public OffsetManager(TopicPartition tp, long initialFetchOffset) {
             LOG.debug("Instantiated {}", this);
         }
     
    -    public void add(KafkaSpoutMessageId msgId) {          // O(Log N)
    +    public void ack(KafkaSpoutMessageId msgId) {          // O(Log N)
    --- End diff --
    
    @vivekmittal I think "addAck" is better than just "ack" as its part of the storm apis might confuse the method name. Probably better name would be "addToAckMsgs" and "addToEmitMsgs"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2104: [STORM-2505] Spout to support topic compaction

Posted by vivekmittal <gi...@git.apache.org>.
Github user vivekmittal commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2104#discussion_r115413017
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---
    @@ -68,13 +74,34 @@ public OffsetAndMetadata findNextCommitOffset() {
             KafkaSpoutMessageId nextCommitMsg = null;     // this is a convenience variable to make it faster to create OffsetAndMetadata
     
             for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is that of a linear scan on a TreeMap
    -            if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) {            // found the next offset to commit
    +            currOffset = currAckedMsg.offset();
    +            if (currOffset == nextCommitOffset + 1) {            // found the next offset to commit
                     found = true;
                     nextCommitMsg = currAckedMsg;
                     nextCommitOffset = currOffset;
    -            } else if (currAckedMsg.offset() > nextCommitOffset + 1) {    // offset found is not continuous to the offsets listed to go in the next commit, so stop search
    -                LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
    -                break;
    +            } else if (currOffset > nextCommitOffset + 1) {
    +                if (emittedOffsets.contains(nextCommitOffset + 1)) {
    +                    LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
    +                    break;
    +                } else {
    +                    /*
    +                        This case will arise in case of non contiguous offset being processed.
    +                        So, if the queue doesn't contain offset = committedOffset + 1 (possible
    +                        if the queue is compacted or deleted), the consumer should jump to
    --- End diff --
    
    Thanks for pointing it out. Will change the text.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2104: [STORM-2505] Spout to support topic compaction

Posted by vivekmittal <gi...@git.apache.org>.
Github user vivekmittal commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2104#discussion_r115406299
  
    --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java ---
    @@ -49,10 +51,14 @@ public OffsetManager(TopicPartition tp, long initialFetchOffset) {
             LOG.debug("Instantiated {}", this);
         }
     
    -    public void add(KafkaSpoutMessageId msgId) {          // O(Log N)
    +    public void ack(KafkaSpoutMessageId msgId) {          // O(Log N)
    --- End diff --
    
    The OffsetManager now maintains acked as well as emitted set. As @srdo  pointed **add** will be a little ambiguous.
    I can make them more verbose if needed. Something like **addAck** and **addEmit**


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2104: [STORM-2505] Spout to support topic compaction

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/2104
  
    @vivekmittal I think you need to  open another PR against 1.x-branch. Don't think this can be cherry-picked onto 1.x-branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---