You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/02/03 02:52:19 UTC
[1/3] storm git commit: STORM-2014: Make KafkaSpout delegate maxRetry
check to RetryService
Repository: storm
Updated Branches:
refs/heads/1.x-branch fa436e132 -> bd370f568
STORM-2014: Make KafkaSpout delegate maxRetry check to RetryService
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9584eace
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9584eace
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9584eace
Branch: refs/heads/1.x-branch
Commit: 9584eace85b259c5b4ff37668a183a3a62edca9a
Parents: fa436e1
Author: Stig Rohde D�ssing <st...@gmail.com>
Authored: Tue Aug 2 19:17:02 2016 +0200
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Feb 3 11:48:45 2017 +0900
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 13 ++++-----
.../storm/kafka/spout/KafkaSpoutConfig.java | 26 ++---------------
.../KafkaSpoutRetryExponentialBackoff.java | 30 ++++++++++++--------
.../kafka/spout/KafkaSpoutRetryService.java | 12 ++++++--
4 files changed, 35 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/9584eace/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index e0a3451..f778288 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -72,7 +72,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
// Bookkeeping
- private transient int maxRetries; // Max number of times a tuple is retried
private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Strategy to determine the fetch offset of the first realized by the spout upon activation
private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure
private transient Timer commitTimer; // timer == null for auto commit mode
@@ -104,7 +103,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
// Spout internals
this.collector = collector;
- maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
numUncommittedOffsets = 0;
// Offset management
@@ -389,11 +387,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
return;
}
- if (msgId.numFails() < maxRetries) {
- emitted.remove(msgId);
- msgId.incrementNumFails();
- retryService.schedule(msgId);
- } else { // limit to max number of retries
+ emitted.remove(msgId);
+ msgId.incrementNumFails();
+ if (!retryService.schedule(msgId)) {
LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
ack(msgId);
}
@@ -511,6 +507,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
/**
+ * An offset is only committed when all records with lower offset have
+ * been acked. This guarantees that all offsets smaller than the
+ * committedOffset have been delivered.
* @return the next OffsetAndMetadata to commit, or null if no offset is ready to commit.
*/
public OffsetAndMetadata findNextCommitOffset() {
http://git-wip-us.apache.org/repos/asf/storm/blob/9584eace/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index db07fda..2b81dea 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -71,7 +71,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
LATEST,
UNCOMMITTED_EARLIEST,
UNCOMMITTED_LATEST }
-
+
public static Builder<String, String> builder(String bootstrapServers, String ... topics) {
return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics);
}
@@ -102,7 +102,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
private RecordTranslator<K, V> translator;
private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
- private int maxRetries = DEFAULT_MAX_RETRIES;
private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
private KafkaSpoutRetryService retryService = DEFAULT_RETRY_SERVICE;
@@ -161,7 +160,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
this.subscription = builder.subscription;
this.pollTimeoutMs = builder.pollTimeoutMs;
this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
- this.maxRetries = builder.maxRetries;
this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
//this could result in a lot of class case exceptions at runtime,
@@ -331,20 +329,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
this.offsetCommitPeriodMs = offsetCommitPeriodMs;
return this;
}
-
- /**
- * Defines the max number of retrials in case of tuple failure. The default is to retry forever, which means that
- * no new records are committed until the previous polled records have been acked. This guarantees at once delivery of
- * all the previously polled records.
- * By specifying a finite value for maxRetries, the user decides to sacrifice guarantee of delivery for the previous
- * polled records in favor of processing more records.
- * @param maxRetries max number of retrials
- */
- public Builder<K,V> setMaxRetries(int maxRetries) {
- this.maxRetries = maxRetries;
- return this;
- }
-
+
/**
* Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
* Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
@@ -435,7 +420,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
// Kafka spout configuration
private final RecordTranslator<K, V> translator;
private final long offsetCommitPeriodMs;
- private final int maxRetries;
private final int maxUncommittedOffsets;
private final FirstPollOffsetStrategy firstPollOffsetStrategy;
private final KafkaSpoutRetryService retryService;
@@ -447,7 +431,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
this.translator = builder.translator;
this.pollTimeoutMs = builder.pollTimeoutMs;
this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
- this.maxRetries = builder.maxRetries;
this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
this.retryService = builder.retryService;
@@ -509,10 +492,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
return (String) kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG);
}
- public int getMaxTupleRetries() {
- return maxRetries;
- }
-
public FirstPollOffsetStrategy getFirstPollOffsetStrategy() {
return firstPollOffsetStrategy;
}
@@ -537,7 +516,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
", value=" + getValueDeserializer() +
", pollTimeoutMs=" + pollTimeoutMs +
", offsetCommitPeriodMs=" + offsetCommitPeriodMs +
- ", maxRetries=" + maxRetries +
", maxUncommittedOffsets=" + maxUncommittedOffsets +
", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
", subscription=" + subscription +
http://git-wip-us.apache.org/repos/asf/storm/blob/9584eace/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
index f59367d..2c8d7e4 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -40,25 +40,26 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class);
private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator();
- private TimeInterval initialDelay;
- private TimeInterval delayPeriod;
- private TimeInterval maxDelay;
- private int maxRetries;
+ private final TimeInterval initialDelay;
+ private final TimeInterval delayPeriod;
+ private final TimeInterval maxDelay;
+ private final int maxRetries;
- private Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
- private Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups
+ private final Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
+ private final Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups
/**
* Comparator ordering by timestamp
*/
private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
+ @Override
public int compare(RetrySchedule entry1, RetrySchedule entry2) {
return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
}
}
private class RetrySchedule {
- private KafkaSpoutMessageId msgId;
+ private final KafkaSpoutMessageId msgId;
private long nextRetryTimeNanos;
public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) {
@@ -94,9 +95,9 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
}
public static class TimeInterval implements Serializable {
- private long lengthNanos;
- private long length;
- private TimeUnit timeUnit;
+ private final long lengthNanos;
+ private final long length;
+ private final TimeUnit timeUnit;
/**
* @param length length of the time interval in the units specified by {@link TimeUnit}
@@ -144,7 +145,10 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
/**
* The time stamp of the next retry is scheduled according to the exponential backoff formula ( geometric progression):
* nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ...
- * nextRetry = Min(nextRetry, currentTime + maxDelay)
+ * nextRetry = Min(nextRetry, currentTime + maxDelay).
+ *
+ * By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the previous
+ * polled records in favor of processing more records.
*
* @param initialDelay initial delay of the first retry
* @param delayPeriod the time interval that is the ratio of the exponential backoff formula (geometric progression)
@@ -239,9 +243,10 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
}
@Override
- public void schedule(KafkaSpoutMessageId msgId) {
+ public boolean schedule(KafkaSpoutMessageId msgId) {
if (msgId.numFails() > maxRetries) {
LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries);
+ return false;
} else {
if (toRetryMsgs.contains(msgId)) {
for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) {
@@ -257,6 +262,7 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
toRetryMsgs.add(msgId);
LOG.debug("Scheduled. {}", retrySchedule);
LOG.trace("Current state {}", retrySchedules);
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9584eace/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
index 5aab167..bf17a5a 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -29,14 +29,18 @@ import java.util.Set;
*/
public interface KafkaSpoutRetryService extends Serializable {
/**
- * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or updates retry time if it has already been scheduled.
+ * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or
+ * updates retry time if it has already been scheduled. It may also indicate
+ * that the message should not be retried, in which case the message will not be scheduled.
* @param msgId message to schedule for retrial
+ * @return true if the message will be retried, false otherwise
*/
- void schedule(KafkaSpoutMessageId msgId);
+ boolean schedule(KafkaSpoutMessageId msgId);
/**
* Removes a message from the list of messages scheduled for retrial
* @param msgId message to remove from retrial
+ * @return true if the message was scheduled for retrial, false otherwise
*/
boolean remove(KafkaSpoutMessageId msgId);
@@ -56,8 +60,9 @@ public interface KafkaSpoutRetryService extends Serializable {
Set<TopicPartition> retriableTopicPartitions();
/**
- * Checks if a specific failed {@link KafkaSpoutMessageId} is is ready to be retried,
+ * Checks if a specific failed {@link KafkaSpoutMessageId} is ready to be retried,
* i.e is scheduled and has retry time that is less than current time.
+ * @param msgId message to check for readiness
* @return true if message is ready to be retried, false otherwise
*/
boolean isReady(KafkaSpoutMessageId msgId);
@@ -65,6 +70,7 @@ public interface KafkaSpoutRetryService extends Serializable {
/**
* Checks if a specific failed {@link KafkaSpoutMessageId} is scheduled to be retried.
* The message may or may not be ready to be retried yet.
+ * @param msgId message to check for scheduling status
* @return true if the message is scheduled to be retried, regardless of being or not ready to be retried.
* Returns false is this message is not scheduled for retrial
*/
[3/3] storm git commit: STORM-2014: CHANGELOG
Posted by ka...@apache.org.
STORM-2014: CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bd370f56
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bd370f56
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bd370f56
Branch: refs/heads/1.x-branch
Commit: bd370f5686869814902f53e61082685aa477a081
Parents: 06632d6
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Feb 3 11:51:40 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Feb 3 11:51:40 2017 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/bd370f56/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 074798c..e7f2664 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.0
+ * STORM-2014: New Kafka spout duplicates checking if failed messages have reached max retries
* STORM-2324: Fix deployment failure if resources directory is missing in topology jar
* STORM-1443: [Storm SQL] Support customizing parallelism in StormSQL
* STORM-2148: [Storm SQL] Trident mode: back to code generate and compile Trident topology
[2/3] storm git commit: Merge branch 'STORM-2014-1.x-merge' into
1.x-branch
Posted by ka...@apache.org.
Merge branch 'STORM-2014-1.x-merge' into 1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/06632d6a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/06632d6a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/06632d6a
Branch: refs/heads/1.x-branch
Commit: 06632d6a832908d83b60039a8e103e5dbc1c2ee8
Parents: fa436e1 9584eac
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Feb 3 11:50:52 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Feb 3 11:50:52 2017 +0900
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 13 ++++-----
.../storm/kafka/spout/KafkaSpoutConfig.java | 26 ++---------------
.../KafkaSpoutRetryExponentialBackoff.java | 30 ++++++++++++--------
.../kafka/spout/KafkaSpoutRetryService.java | 12 ++++++--
4 files changed, 35 insertions(+), 46 deletions(-)
----------------------------------------------------------------------