You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2017/05/13 05:13:56 UTC
[1/3] storm git commit: [STORM-2505] Spout to support topic compaction
Repository: storm
Updated Branches:
refs/heads/1.x-branch ea7e7f754 -> 979153be8
[STORM-2505] Spout to support topic compaction
[STORM-2505] Maintaining a emitted set in OffsetManager to handle the voids in the topic
[STORM-2505] Handling NPE in Boxed Long to primitive type comparison
[STORM-2505] Rephrased the log message when a non contiguous offset is acked by the spout
[STORM-2505] Updated comment
[STORM-2505] Renamed the methods ack/emit to addToAckMsgs and addToEmitMsgs in OffsetManager
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a2dde20c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a2dde20c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a2dde20c
Branch: refs/heads/1.x-branch
Commit: a2dde20cc4313ec373946e0845742eabdce67017
Parents: 95ec555
Author: Vivek Mittal <vi...@flipkart.com>
Authored: Mon May 8 10:43:36 2017 +0530
Committer: Vivek Mittal <vi...@flipkart.com>
Committed: Wed May 10 11:10:27 2017 +0530
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 21 ++++-----
.../kafka/spout/internal/OffsetManager.java | 47 +++++++++++++++++---
2 files changed, 53 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a2dde20c/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 47c305b..32542b9 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -77,7 +77,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process.
// Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
- private transient Map<TopicPartition, OffsetManager> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, or after a consumer rebalance, or during close/deactivate
+ private transient Map<TopicPartition, OffsetManager> offsetManagers;// Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires, or after a consumer rebalance, or during close/deactivate
private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed. Not used if it's AutoCommitMode
private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed. Not used if auto commit mode is enabled.
@@ -117,7 +117,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
- acked = new HashMap<>();
+ offsetManagers = new HashMap<>();
emitted = new HashSet<>();
waitingToEmit = Collections.emptyListIterator();
@@ -147,7 +147,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private void initialize(Collection<TopicPartition> partitions) {
if (!consumerAutoCommitMode) {
- acked.keySet().retainAll(partitions); // remove from acked all partitions that are no longer assigned to this spout
+ offsetManagers.keySet().retainAll(partitions); // remove from acked all partitions that are no longer assigned to this spout
}
retryService.retainAll(partitions);
@@ -205,8 +205,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private void setAcked(TopicPartition tp, long fetchOffset) {
// If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off
- if (!consumerAutoCommitMode && !acked.containsKey(tp)) {
- acked.put(tp, new OffsetManager(tp, fetchOffset));
+ if (!consumerAutoCommitMode && !offsetManagers.containsKey(tp)) {
+ offsetManagers.put(tp, new OffsetManager(tp, fetchOffset));
}
}
@@ -319,7 +319,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
- if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) { // has been acked
+ if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) { // has been acked
LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
} else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
@@ -337,6 +337,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
} else {
emitted.add(msgId);
+ offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from schedule.
retryService.remove(msgId);
} else { //New tuple, hence increment the uncommitted offset counter
@@ -371,7 +372,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private void commitOffsetsForAckedTuples() {
// Find offsets that are ready to be committed for every topic partition
final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
- for (Map.Entry<TopicPartition, OffsetManager> tpOffset : acked.entrySet()) {
+ for (Map.Entry<TopicPartition, OffsetManager> tpOffset : offsetManagers.entrySet()) {
final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset();
if (nextCommitOffset != null) {
nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
@@ -387,7 +388,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
for (Map.Entry<TopicPartition, OffsetAndMetadata> tpOffset : nextCommitOffsets.entrySet()) {
//Update the OffsetManager for each committed partition, and update numUncommittedOffsets
final TopicPartition tp = tpOffset.getKey();
- final OffsetManager offsetManager = acked.get(tp);
+ final OffsetManager offsetManager = offsetManagers.get(tp);
long numCommittedOffsets = offsetManager.commit(tpOffset.getValue());
numUncommittedOffsets -= numCommittedOffsets;
LOG.debug("[{}] uncommitted offsets across all topic partitions",
@@ -413,7 +414,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
} else {
if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically
- acked.get(msgId.getTopicPartition()).add(msgId);
+ offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
}
emitted.remove(msgId);
}
@@ -493,7 +494,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
@Override
public String toString() {
return "KafkaSpout{" +
- "acked=" + acked +
+ "offsetManagers =" + offsetManagers +
", emitted=" + emitted +
"}";
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a2dde20c/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index 4ce0471..0bf4132 100755
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -39,6 +39,8 @@ public class OffsetManager {
private final long initialFetchOffset;
// Last offset committed to Kafka. Initially it is set to fetchOffset - 1
private long committedOffset;
+ // Emitted Offsets List
+ private final NavigableSet<Long> emittedOffsets = new TreeSet<>();
// Acked messages sorted by ascending order of offset
private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR);
@@ -49,10 +51,14 @@ public class OffsetManager {
LOG.debug("Instantiated {}", this);
}
- public void add(KafkaSpoutMessageId msgId) { // O(Log N)
+ public void addToAckMsgs(KafkaSpoutMessageId msgId) { // O(Log N)
ackedMsgs.add(msgId);
}
+ public void addToEmitMsgs(long offset) {
+ this.emittedOffsets.add(offset); // O(Log N)
+ }
+
/**
* An offset is only committed when all records with lower offset have been
* acked. This guarantees that all offsets smaller than the committedOffset
@@ -68,13 +74,34 @@ public class OffsetManager {
KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata
for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap
- if ((currOffset = currAckedMsg.offset()) == nextCommitOffset + 1) { // found the next offset to commit
+ currOffset = currAckedMsg.offset();
+ if (currOffset == nextCommitOffset + 1) { // found the next offset to commit
found = true;
nextCommitMsg = currAckedMsg;
nextCommitOffset = currOffset;
- } else if (currAckedMsg.offset() > nextCommitOffset + 1) { // offset found is not continuous to the offsets listed to go in the next commit, so stop search
- LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
- break;
+ } else if (currOffset > nextCommitOffset + 1) {
+ if (emittedOffsets.contains(nextCommitOffset + 1)) {
+ LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
+ break;
+ } else {
+ /*
+ This case will arise in case of non contiguous offset being processed.
+ So, if the topic doesn't contain offset = committedOffset + 1 (possible
+ if the topic is compacted or deleted), the consumer should jump to
+ the next logical point in the topic. Next logical offset should be the
+ first element after committedOffset in the ascending ordered emitted set.
+ */
+ LOG.debug("Processed non contiguous offset. (committedOffset+1) is no longer part of the topic. Committed: [{}], Processed: [{}]", committedOffset, currOffset);
+ final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset);
+ if (nextEmittedOffset != null && currOffset == nextEmittedOffset) {
+ found = true;
+ nextCommitMsg = currAckedMsg;
+ nextCommitOffset = currOffset;
+ } else {
+ LOG.debug("topic-partition [{}] has non-continuous offset [{}]. Next Offset to commit should be [{}]", tp, currOffset, nextEmittedOffset);
+ break;
+ }
+ }
} else {
//Received a redundant ack. Ignore and continue processing.
LOG.warn("topic-partition [{}] has unexpected offset [{}]. Current committed Offset [{}]",
@@ -113,6 +140,15 @@ public class OffsetManager {
break;
}
}
+
+ for (Iterator<Long> iterator = emittedOffsets.iterator(); iterator.hasNext();) {
+ if (iterator.next() <= committedOffset.offset()) {
+ iterator.remove();
+ } else {
+ break;
+ }
+ }
+
LOG.trace("{}", this);
LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}].",
@@ -143,6 +179,7 @@ public class OffsetManager {
+ "topic-partition=" + tp
+ ", fetchOffset=" + initialFetchOffset
+ ", committedOffset=" + committedOffset
+ + ", emittedOffsets=" + emittedOffsets
+ ", ackedMsgs=" + ackedMsgs
+ '}';
}
[2/3] storm git commit: Merge branch '1.x-branch' of
https://github.com/vivekmittal/storm into 1.x-branch
Posted by sr...@apache.org.
Merge branch '1.x-branch' of https://github.com/vivekmittal/storm into 1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/958f53f0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/958f53f0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/958f53f0
Branch: refs/heads/1.x-branch
Commit: 958f53f013363128189c0dd022d8d2bdec8e1078
Parents: ea7e7f7 a2dde20
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Fri May 12 22:03:32 2017 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Fri May 12 22:03:32 2017 -0700
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 21 ++++-----
.../kafka/spout/internal/OffsetManager.java | 47 +++++++++++++++++---
2 files changed, 53 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
[3/3] storm git commit: Added STORM-2505 to CHANGELOG.
Posted by sr...@apache.org.
Added STORM-2505 to CHANGELOG.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/979153be
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/979153be
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/979153be
Branch: refs/heads/1.x-branch
Commit: 979153be8ae09ecdc368d10a5aa01db7685ef02c
Parents: 958f53f
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Fri May 12 22:04:18 2017 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Fri May 12 22:04:18 2017 -0700
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/979153be/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3d9e053..daf20a4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.1
+ * STORM-2505: Spout to support topic compaction
* STORM-2498: Fix Download Full File link
* STORM-2191: shorten classpaths by using wildcards
* STORM-2482: Refactor the Storm auto credential plugins to be more usable