You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/03 14:41:41 UTC
[23/40] storm git commit: [STORM-2505] Spout to support topic
compaction
[STORM-2505] Spout to support topic compaction
[STORM-2505] Maintaining a emitted set in OffsetManager to handle the voids in the topic
[STORM-2505] Handling NPE in Boxed Long to primitive type comparison
[STORM-2505] Rephrased the log message when a non contiguous offset is acked by the spout
[STORM-2505] Updated comment
[STORM-2505] Renamed the methods ack/emit to addToAckMsgs and addToEmitMsgs in OffsetManager
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/97605c1b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/97605c1b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/97605c1b
Branch: refs/heads/1.1.x-branch
Commit: 97605c1b1bd1fc286facaf36ceeb85fd31298fdf
Parents: 796e715
Author: Vivek Mittal <vi...@flipkart.com>
Authored: Mon May 8 10:43:36 2017 +0530
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Jun 29 16:45:44 2017 +0900
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 21 ++++-----
.../kafka/spout/internal/OffsetManager.java | 47 +++++++++++++++++---
2 files changed, 53 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/97605c1b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 47c305b..32542b9 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -77,7 +77,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process.
// Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
- private transient Map<TopicPartition, OffsetManager> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, or after a consumer rebalance, or during close/deactivate
+ private transient Map<TopicPartition, OffsetManager> offsetManagers;// Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires, or after a consumer rebalance, or during close/deactivate
private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode
private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed. Not used if auto commit mode is enabled.
@@ -117,7 +117,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
- acked = new HashMap<>();
+ offsetManagers = new HashMap<>();
emitted = new HashSet<>();
waitingToEmit = Collections.emptyListIterator();
@@ -147,7 +147,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private void initialize(Collection<TopicPartition> partitions) {
if (!consumerAutoCommitMode) {
- acked.keySet().retainAll(partitions); // remove from acked all partitions that are no longer assigned to this spout
+ offsetManagers.keySet().retainAll(partitions); // remove from acked all partitions that are no longer assigned to this spout
}
retryService.retainAll(partitions);
@@ -205,8 +205,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private void setAcked(TopicPartition tp, long fetchOffset) {
// If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off
- if (!consumerAutoCommitMode && !acked.containsKey(tp)) {
- acked.put(tp, new OffsetManager(tp, fetchOffset));
+ if (!consumerAutoCommitMode && !offsetManagers.containsKey(tp)) {
+ offsetManagers.put(tp, new OffsetManager(tp, fetchOffset));
}
}
@@ -319,7 +319,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
- if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) { // has been acked
+ if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) { // has been acked
LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
} else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
@@ -337,6 +337,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
} else {
emitted.add(msgId);
+ offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from schedule.
retryService.remove(msgId);
} else { //New tuple, hence increment the uncommitted offset counter
@@ -371,7 +372,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private void commitOffsetsForAckedTuples() {
// Find offsets that are ready to be committed for every topic partition
final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
- for (Map.Entry<TopicPartition, OffsetManager> tpOffset : acked.entrySet()) {
+ for (Map.Entry<TopicPartition, OffsetManager> tpOffset : offsetManagers.entrySet()) {
final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset();
if (nextCommitOffset != null) {
nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
@@ -387,7 +388,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
for (Map.Entry<TopicPartition, OffsetAndMetadata> tpOffset : nextCommitOffsets.entrySet()) {
//Update the OffsetManager for each committed partition, and update numUncommittedOffsets
final TopicPartition tp = tpOffset.getKey();
- final OffsetManager offsetManager = acked.get(tp);
+ final OffsetManager offsetManager = offsetManagers.get(tp);
long numCommittedOffsets = offsetManager.commit(tpOffset.getValue());
numUncommittedOffsets -= numCommittedOffsets;
LOG.debug("[{}] uncommitted offsets across all topic partitions",
@@ -413,7 +414,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
} else {
if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically
- acked.get(msgId.getTopicPartition()).add(msgId);
+ offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
}
emitted.remove(msgId);
}
@@ -493,7 +494,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
@Override
public String toString() {
return "KafkaSpout{" +
- "acked=" + acked +
+ "offsetManagers =" + offsetManagers +
", emitted=" + emitted +
"}";
}
http://git-wip-us.apache.org/repos/asf/storm/blob/97605c1b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index 4ce0471..0bf4132 100755
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -39,6 +39,8 @@ public class OffsetManager {
private final long initialFetchOffset;
// Last offset committed to Kafka. Initially it is set to fetchOffset - 1
private long committedOffset;
+ // Emitted Offsets List
+ private final NavigableSet<Long> emittedOffsets = new TreeSet<>();
// Acked messages sorted by ascending order of offset
private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR);
@@ -49,10 +51,14 @@ public class OffsetManager {
LOG.debug("Instantiated {}", this);
}
- public void add(KafkaSpoutMessageId msgId) { // O(Log N)
+ public void addToAckMsgs(KafkaSpoutMessageId msgId) { // O(Log N)
ackedMsgs.add(msgId);
}
+ public void addToEmitMsgs(long offset) {
+ this.emittedOffsets.add(offset); // O(Log N)
+ }
+
/**
* An offset is only committed when all records with lower offset have been
* acked. This guarantees that all offsets smaller than the committedOffset
@@ -68,13 +74,34 @@ public class OffsetManager {
KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata
for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap
- if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) { // found the next offset to commit
+ currOffset = currAckedMsg.offset();
+ if (currOffset == nextCommitOffset + 1) { // found the next offset to commit
found = true;
nextCommitMsg = currAckedMsg;
nextCommitOffset = currOffset;
- } else if (currAckedMsg.offset() > nextCommitOffset + 1) { // offset found is not continuous to the offsets listed to go in the next commit, so stop search
- LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
- break;
+ } else if (currOffset > nextCommitOffset + 1) {
+ if (emittedOffsets.contains(nextCommitOffset + 1)) {
+ LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
+ break;
+ } else {
+ /*
+ This case will arise in case of non contiguous offset being processed.
+ So, if the topic doesn't contain offset = committedOffset + 1 (possible
+ if the topic is compacted or deleted), the consumer should jump to
+ the next logical point in the topic. Next logical offset should be the
+ first element after committedOffset in the ascending ordered emitted set.
+ */
+ LOG.debug("Processed non contiguous offset. (committedOffset+1) is no longer part of the topic. Committed: [{}], Processed: [{}]", committedOffset, currOffset);
+ final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset);
+ if (nextEmittedOffset != null && currOffset == nextEmittedOffset) {
+ found = true;
+ nextCommitMsg = currAckedMsg;
+ nextCommitOffset = currOffset;
+ } else {
+ LOG.debug("topic-partition [{}] has non-continuous offset [{}]. Next Offset to commit should be [{}]", tp, currOffset, nextEmittedOffset);
+ break;
+ }
+ }
} else {
//Received a redundant ack. Ignore and continue processing.
LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]",
@@ -113,6 +140,15 @@ public class OffsetManager {
break;
}
}
+
+ for (Iterator<Long> iterator = emittedOffsets.iterator(); iterator.hasNext();) {
+ if (iterator.next() <= committedOffset.offset()) {
+ iterator.remove();
+ } else {
+ break;
+ }
+ }
+
LOG.trace("{}", this);
LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}].",
@@ -143,6 +179,7 @@ public class OffsetManager {
+ "topic-partition=" + tp
+ ", fetchOffset=" + initialFetchOffset
+ ", committedOffset=" + committedOffset
+ + ", emittedOffsets=" + emittedOffsets
+ ", ackedMsgs=" + ackedMsgs
+ '}';
}