You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/03 14:41:41 UTC

[23/40] storm git commit: [STORM-2505] Spout to support topic compaction

[STORM-2505] Spout to support topic compaction

[STORM-2505] Maintaining a emitted set in OffsetManager to handle the voids in the topic

[STORM-2505] Handling NPE in Boxed Long to primitive type comparison

[STORM-2505] Rephrased the log message when a non contiguous offset is acked by the spout

[STORM-2505] Updated comment

[STORM-2505] Renamed the methods ack/emit to addToAckMsgs and addToEmitMsgs in OffsetManager


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/97605c1b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/97605c1b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/97605c1b

Branch: refs/heads/1.1.x-branch
Commit: 97605c1b1bd1fc286facaf36ceeb85fd31298fdf
Parents: 796e715
Author: Vivek Mittal <vi...@flipkart.com>
Authored: Mon May 8 10:43:36 2017 +0530
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Jun 29 16:45:44 2017 +0900

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 21 ++++-----
 .../kafka/spout/internal/OffsetManager.java     | 47 +++++++++++++++++---
 2 files changed, 53 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/97605c1b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 47c305b..32542b9 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -77,7 +77,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private transient boolean initialized;                              // Flag indicating that the spout is still undergoing initialization process.
     // Initialization is only complete after the first call to  KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
 
-    private transient Map<TopicPartition, OffsetManager> acked;         // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, or after a consumer rebalance, or during close/deactivate
+    private transient Map<TopicPartition, OffsetManager> offsetManagers;// Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires, or after a consumer rebalance, or during close/deactivate
     private transient Set<KafkaSpoutMessageId> emitted;                 // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode
     private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;     // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
     private transient long numUncommittedOffsets;                       // Number of offsets that have been polled and emitted but not yet been committed. Not used if auto commit mode is enabled.
@@ -117,7 +117,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
         refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
 
-        acked = new HashMap<>();
+        offsetManagers = new HashMap<>();
         emitted = new HashSet<>();
         waitingToEmit = Collections.emptyListIterator();
 
@@ -147,7 +147,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         private void initialize(Collection<TopicPartition> partitions) {
             if (!consumerAutoCommitMode) {
-                acked.keySet().retainAll(partitions);   // remove from acked all partitions that are no longer assigned to this spout
+                offsetManagers.keySet().retainAll(partitions);   // remove from acked all partitions that are no longer assigned to this spout
             }
 
             retryService.retainAll(partitions);
@@ -205,8 +205,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private void setAcked(TopicPartition tp, long fetchOffset) {
         // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off
-        if (!consumerAutoCommitMode && !acked.containsKey(tp)) {
-            acked.put(tp, new OffsetManager(tp, fetchOffset));
+        if (!consumerAutoCommitMode && !offsetManagers.containsKey(tp)) {
+            offsetManagers.put(tp, new OffsetManager(tp, fetchOffset));
         }
     }
 
@@ -319,7 +319,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
         final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
         final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
-        if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) {   // has been acked
+        if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) {   // has been acked
             LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
         } else if (emitted.contains(msgId)) {   // has been emitted and it's pending ack or fail
             LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
@@ -337,6 +337,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                         }
                     } else {
                         emitted.add(msgId);
+                        offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
                         if (isScheduled) {  // Was scheduled for retry and re-emitted, so remove from schedule.
                             retryService.remove(msgId);
                         } else {            //New tuple, hence increment the uncommitted offset counter
@@ -371,7 +372,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private void commitOffsetsForAckedTuples() {
         // Find offsets that are ready to be committed for every topic partition
         final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
-        for (Map.Entry<TopicPartition, OffsetManager> tpOffset : acked.entrySet()) {
+        for (Map.Entry<TopicPartition, OffsetManager> tpOffset : offsetManagers.entrySet()) {
             final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset();
             if (nextCommitOffset != null) {
                 nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
@@ -387,7 +388,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             for (Map.Entry<TopicPartition, OffsetAndMetadata> tpOffset : nextCommitOffsets.entrySet()) {
                 //Update the OffsetManager for each committed partition, and update numUncommittedOffsets
                 final TopicPartition tp = tpOffset.getKey();
-                final OffsetManager offsetManager = acked.get(tp);
+                final OffsetManager offsetManager = offsetManagers.get(tp);
                 long numCommittedOffsets = offsetManager.commit(tpOffset.getValue());
                 numUncommittedOffsets -= numCommittedOffsets;
                 LOG.debug("[{}] uncommitted offsets across all topic partitions",
@@ -413,7 +414,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             }
         } else {
             if (!consumerAutoCommitMode) {  // Only need to keep track of acked tuples if commits are not done automatically
-                acked.get(msgId.getTopicPartition()).add(msgId);
+                offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
             }
             emitted.remove(msgId);
         }
@@ -493,7 +494,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     @Override
     public String toString() {
         return "KafkaSpout{" +
-                "acked=" + acked +
+                "offsetManagers =" + offsetManagers +
                 ", emitted=" + emitted +
                 "}";
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/97605c1b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index 4ce0471..0bf4132 100755
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -39,6 +39,8 @@ public class OffsetManager {
     private final long initialFetchOffset;
     // Last offset committed to Kafka. Initially it is set to fetchOffset - 1
     private long committedOffset;
+    // Emitted Offsets List
+    private final NavigableSet<Long> emittedOffsets = new TreeSet<>();
     // Acked messages sorted by ascending order of offset
     private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR);
 
@@ -49,10 +51,14 @@ public class OffsetManager {
         LOG.debug("Instantiated {}", this);
     }
 
-    public void add(KafkaSpoutMessageId msgId) {          // O(Log N)
+    public void addToAckMsgs(KafkaSpoutMessageId msgId) {          // O(Log N)
         ackedMsgs.add(msgId);
     }
 
+    public void addToEmitMsgs(long offset) {
+        this.emittedOffsets.add(offset);                  // O(Log N)
+    }
+
     /**
      * An offset is only committed when all records with lower offset have been
      * acked. This guarantees that all offsets smaller than the committedOffset
@@ -68,13 +74,34 @@ public class OffsetManager {
         KafkaSpoutMessageId nextCommitMsg = null;     // this is a convenience variable to make it faster to create OffsetAndMetadata
 
         for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is that of a linear scan on a TreeMap
-            if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) {            // found the next offset to commit
+            currOffset = currAckedMsg.offset();
+            if (currOffset == nextCommitOffset + 1) {            // found the next offset to commit
                 found = true;
                 nextCommitMsg = currAckedMsg;
                 nextCommitOffset = currOffset;
-            } else if (currAckedMsg.offset() > nextCommitOffset + 1) {    // offset found is not continuous to the offsets listed to go in the next commit, so stop search
-                LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
-                break;
+            } else if (currOffset > nextCommitOffset + 1) {
+                if (emittedOffsets.contains(nextCommitOffset + 1)) {
+                    LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
+                    break;
+                } else {
+                    /*
+                        This case will arise in case of non contiguous offset being processed.
+                        So, if the topic doesn't contain offset = committedOffset + 1 (possible
+                        if the topic is compacted or deleted), the consumer should jump to
+                        the next logical point in the topic. Next logical offset should be the
+                        first element after committedOffset in the ascending ordered emitted set.
+                     */
+                    LOG.debug("Processed non contiguous offset. (committedOffset+1) is no longer part of the topic. Committed: [{}], Processed: [{}]", committedOffset, currOffset);
+                    final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset);
+                    if (nextEmittedOffset != null && currOffset == nextEmittedOffset) {
+                        found = true;
+                        nextCommitMsg = currAckedMsg;
+                        nextCommitOffset = currOffset;
+                    } else {
+                        LOG.debug("topic-partition [{}] has non-continuous offset [{}]. Next Offset to commit should be [{}]", tp, currOffset, nextEmittedOffset);
+                        break;
+                    }
+                }
             } else {
                 //Received a redundant ack. Ignore and continue processing.
                 LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]",
@@ -113,6 +140,15 @@ public class OffsetManager {
                 break;
             }
         }
+
+        for (Iterator<Long> iterator = emittedOffsets.iterator(); iterator.hasNext();) {
+            if (iterator.next() <= committedOffset.offset()) {
+                iterator.remove();
+            } else {
+                break;
+            }
+        }
+
         LOG.trace("{}", this);
         
         LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}].",
@@ -143,6 +179,7 @@ public class OffsetManager {
             + "topic-partition=" + tp
             + ", fetchOffset=" + initialFetchOffset
             + ", committedOffset=" + committedOffset
+            + ", emittedOffsets=" + emittedOffsets
             + ", ackedMsgs=" + ackedMsgs
             + '}';
     }