You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/06/10 15:04:29 UTC
[01/10] storm git commit: STORM-1705: Cap number of retries for a
failed message
Repository: storm
Updated Branches:
refs/heads/1.0.x-branch 0bfa2bc39 -> 53e0dc539
refs/heads/1.x-branch 244b16695 -> 478babe45
refs/heads/master 0b54767e2 -> 582418952
STORM-1705: Cap number of retries for a failed message
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0f1aec56
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0f1aec56
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0f1aec56
Branch: refs/heads/1.x-branch
Commit: 0f1aec566a6ffa6c29da66ea2969bdb27921baf5
Parents: a240df5
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Tue Apr 12 15:28:06 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Tue Apr 12 15:28:06 2016 +0530
----------------------------------------------------------------------
.../ExponentialBackoffMsgRetryManager.java | 38 +++++---
.../storm/kafka/FailedMsgRetryManager.java | 59 ++++++++++--
.../apache/storm/kafka/PartitionManager.java | 32 +++++--
.../jvm/org/apache/storm/kafka/SpoutConfig.java | 2 +
.../ExponentialBackoffMsgRetryManagerTest.java | 99 +++++++++++++-------
5 files changed, 171 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0f1aec56/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
index f86d624..7b5f5dd 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -27,17 +27,25 @@ import java.util.concurrent.ConcurrentHashMap;
public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager {
- private final long retryInitialDelayMs;
- private final double retryDelayMultiplier;
- private final long retryDelayMaxMs;
+ private long retryInitialDelayMs;
+ private double retryDelayMultiplier;
+ private long retryDelayMaxMs;
+ private int retryLimit;
- private Queue<MessageRetryRecord> waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
- private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>();
+ private Queue<MessageRetryRecord> waiting;
+ private Map<Long,MessageRetryRecord> records;
- public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double retryDelayMultiplier, long retryDelayMaxMs) {
- this.retryInitialDelayMs = retryInitialDelayMs;
- this.retryDelayMultiplier = retryDelayMultiplier;
- this.retryDelayMaxMs = retryDelayMaxMs;
+ public ExponentialBackoffMsgRetryManager() {
+
+ }
+
+ public void prepare(SpoutConfig spoutConfig) {
+ this.retryInitialDelayMs = spoutConfig.retryInitialDelayMs;
+ this.retryDelayMultiplier = spoutConfig.retryDelayMultiplier;
+ this.retryDelayMaxMs = spoutConfig.retryDelayMaxMs;
+ this.retryLimit = spoutConfig.retryLimit;
+ this.waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
+ this.records = new ConcurrentHashMap<Long,MessageRetryRecord>();
}
@Override
@@ -86,7 +94,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
}
@Override
- public boolean shouldRetryMsg(Long offset) {
+ public boolean shouldReEmitMsg(Long offset) {
MessageRetryRecord record = this.records.get(offset);
return record != null &&
this.waiting.contains(record) &&
@@ -94,7 +102,15 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
}
@Override
- public Set<Long> clearInvalidMessages(Long kafkaOffset) {
+ public boolean retryFurther(Long offset) {
+ MessageRetryRecord record = this.records.get(offset);
+ return ! (record != null &&
+ this.waiting.contains(record) &&
+ this.retryLimit <= record.retryNum);
+ }
+
+ @Override
+ public Set<Long> clearOffsetsBefore(Long kafkaOffset) {
Set<Long> invalidOffsets = new HashSet<Long>();
for(Long offset : records.keySet()){
if(offset < kafkaOffset){
http://git-wip-us.apache.org/repos/asf/storm/blob/0f1aec56/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
index e9a7092..9a3b19f 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
@@ -17,13 +17,58 @@
*/
package org.apache.storm.kafka;
+import java.io.Serializable;
import java.util.Set;
-public interface FailedMsgRetryManager {
- public void failed(Long offset);
- public void acked(Long offset);
- public void retryStarted(Long offset);
- public Long nextFailedMessageToRetry();
- public boolean shouldRetryMsg(Long offset);
- public Set<Long> clearInvalidMessages(Long kafkaOffset);
+public interface FailedMsgRetryManager extends Serializable {
+
+ /**
+ * Initialization
+ */
+ void prepare(SpoutConfig spoutConfig);
+
+ /**
+ * Message corresponding to the offset failed in kafka spout.
+ * @param offset
+ */
+ void failed(Long offset);
+
+ /**
+ * Message corresponding to the offset, was acked to kafka spout.
+ * @param offset
+ */
+ void acked(Long offset);
+
+ /**
+ * Message corresponding to the offset, has been re-emitted and under transit.
+ * @param offset
+ */
+ void retryStarted(Long offset);
+
+ /**
+ * The offset of message, which is to be re-emitted. Spout will fetch messages starting from this offset
+ * and resend them, except completed messages.
+ * @return
+ */
+ Long nextFailedMessageToRetry();
+
+ /**
+ * @param offset
+ * @return True if the message corresponding to the offset should be emitted NOW. False otherwise.
+ */
+ boolean shouldReEmitMsg(Long offset);
+
+ /**
+ * Spout will clean up the state for this offset if false is returned.
+ * @param offset
+ * @return True if the message will be retried again. False otherwise.
+ */
+ boolean retryFurther(Long offset);
+
+ /**
+ * Clear any offsets before kafkaOffset. These offsets are no longer available in kafka.
+ * @param kafkaOffset
+ * @return Set of offsets removed.
+ */
+ Set<Long> clearOffsetsBefore(Long kafkaOffset);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0f1aec56/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index 4db8af6..d6e51b7 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -46,6 +46,9 @@ public class PartitionManager {
private final CountMetric _fetchAPIMessageCount;
// Count of messages which could not be emitted or retried because they were deleted from kafka
private final CountMetric _lostMessageCount;
+ // Count of messages which were not retried because failedMsgRetryManager didn't consider offset eligible for
+ // retry
+ private final CountMetric _messageIneligibleForRetryCount;
Long _emittedToOffset;
// _pending key = Kafka offset, value = time at which the message was first submitted to the topology
private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>();
@@ -72,9 +75,14 @@ public class PartitionManager {
_stormConf = stormConf;
numberAcked = numberFailed = 0;
- _failedMsgRetryManager = new ExponentialBackoffMsgRetryManager(_spoutConfig.retryInitialDelayMs,
- _spoutConfig.retryDelayMultiplier,
- _spoutConfig.retryDelayMaxMs);
+ try {
+ _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
+ _failedMsgRetryManager.prepare(spoutConfig);
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+ throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>",
+ FailedMsgRetryManager.class,
+ spoutConfig.failedMsgRetryManagerClass), e);
+ }
String jsonTopologyId = null;
Long jsonOffset = null;
@@ -120,6 +128,7 @@ public class PartitionManager {
_fetchAPICallCount = new CountMetric();
_fetchAPIMessageCount = new CountMetric();
_lostMessageCount = new CountMetric();
+ _messageIneligibleForRetryCount = new CountMetric();
}
public Map getMetricsDataMap() {
@@ -129,6 +138,7 @@ public class PartitionManager {
ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset());
ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset());
ret.put(_partition + "/lostMessageCount", _lostMessageCount.getValueAndReset());
+ ret.put(_partition + "/messageIneligibleForRetryCount", _messageIneligibleForRetryCount.getValueAndReset());
return ret;
}
@@ -197,7 +207,7 @@ public class PartitionManager {
// all the failed offsets, that are earlier than actual EarliestTime
// offset, since they are anyway not there.
// These calls to broker API will be then saved.
- Set<Long> omitted = this._failedMsgRetryManager.clearInvalidMessages(offset);
+ Set<Long> omitted = this._failedMsgRetryManager.clearOffsetsBefore(offset);
// Omitted messages have not been acked and may be lost
if (null != omitted) {
@@ -229,14 +239,14 @@ public class PartitionManager {
// Skip any old offsets.
continue;
}
- if (processingNewTuples || this._failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
+ if (processingNewTuples || this._failedMsgRetryManager.shouldReEmitMsg(cur_offset)) {
numMessages += 1;
if (!_pending.containsKey(cur_offset)) {
_pending.put(cur_offset, System.currentTimeMillis());
}
_waitingToEmit.add(msg);
_emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
- if (_failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
+ if (_failedMsgRetryManager.shouldReEmitMsg(cur_offset)) {
this._failedMsgRetryManager.retryStarted(cur_offset);
}
}
@@ -269,7 +279,15 @@ public class PartitionManager {
throw new RuntimeException("Too many tuple failures");
}
- this._failedMsgRetryManager.failed(offset);
+ // Offset may not be considered for retry by failedMsgRetryManager
+ if (this._failedMsgRetryManager.retryFurther(offset)) {
+ this._failedMsgRetryManager.failed(offset);
+ } else {
+ // state for the offset should be cleaned up
+ _messageIneligibleForRetryCount.incr();
+ _pending.remove(offset);
+ this._failedMsgRetryManager.acked(offset);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0f1aec56/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
index 1ac41c8..415ce0a 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
@@ -37,6 +37,8 @@ public class SpoutConfig extends KafkaConfig implements Serializable {
public long retryInitialDelayMs = 0;
public double retryDelayMultiplier = 1.0;
public long retryDelayMaxMs = 60 * 1000;
+ public int retryLimit = Integer.MAX_VALUE;
+ public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
super(hosts, topic);
http://git-wip-us.apache.org/repos/asf/storm/blob/0f1aec56/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
index 8fa6564..f2815e2 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
@@ -17,14 +17,14 @@
*/
package org.apache.storm.kafka;
+import org.junit.Test;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import org.junit.Test;
-
public class ExponentialBackoffMsgRetryManagerTest {
private static final Long TEST_OFFSET = 101L;
@@ -34,52 +34,54 @@ public class ExponentialBackoffMsgRetryManagerTest {
@Test
public void testImmediateRetry() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+
+
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry immediately", manager.shouldReEmitMsg(TEST_OFFSET));
manager.retryStarted(TEST_OFFSET);
manager.failed(TEST_OFFSET);
next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry immediately", manager.shouldReEmitMsg(TEST_OFFSET));
}
@Test
public void testSingleDelay() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(100, 1d, 1000);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(100, 1d, 1000, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
Thread.sleep(5);
Long next = manager.nextFailedMessageToRetry();
assertNull("expect no message ready for retry yet", next);
- assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready for retry yet", manager.shouldReEmitMsg(TEST_OFFSET));
Thread.sleep(100);
next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
}
@Test
public void testExponentialBackoff() throws Exception {
final long initial = 10;
final double mult = 2d;
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, initial * 10);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, initial * 10, Integer.MAX_VALUE);
long expectedWaitTime = initial;
for (long i = 0L; i < 3L; ++i) {
manager.failed(TEST_OFFSET);
Thread.sleep((expectedWaitTime + 1L) / 2L);
- assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready for retry yet", manager.shouldReEmitMsg(TEST_OFFSET));
Thread.sleep((expectedWaitTime + 1L) / 2L);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
manager.retryStarted(TEST_OFFSET);
expectedWaitTime *= mult;
@@ -91,7 +93,7 @@ public class ExponentialBackoffMsgRetryManagerTest {
final long initial = 10;
final double mult = 2d;
final long max = 20;
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, max);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, max, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
Thread.sleep(initial);
@@ -104,8 +106,8 @@ public class ExponentialBackoffMsgRetryManagerTest {
// so TEST_OFFSET2 should come first
Thread.sleep(initial * 2);
- assertTrue("message "+TEST_OFFSET+"should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
- assertTrue("message "+TEST_OFFSET2+"should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2));
+ assertTrue("message "+TEST_OFFSET+"should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
+ assertTrue("message "+TEST_OFFSET2+"should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET2));
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect first message to retry is "+TEST_OFFSET2, TEST_OFFSET2, next);
@@ -129,27 +131,27 @@ public class ExponentialBackoffMsgRetryManagerTest {
@Test
public void testQueriesAfterRetriedAlready() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry immediately", manager.shouldReEmitMsg(TEST_OFFSET));
manager.retryStarted(TEST_OFFSET);
next = manager.nextFailedMessageToRetry();
assertNull("expect no message ready after retried", next);
- assertFalse("message should not be ready after retried", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready after retried", manager.shouldReEmitMsg(TEST_OFFSET));
}
@Test(expected = IllegalStateException.class)
public void testRetryWithoutFail() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.retryStarted(TEST_OFFSET);
}
@Test(expected = IllegalStateException.class)
public void testFailRetryRetry() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
try {
manager.retryStarted(TEST_OFFSET);
@@ -157,7 +159,7 @@ public class ExponentialBackoffMsgRetryManagerTest {
fail("IllegalStateException unexpected here: " + ise);
}
- assertFalse("message should not be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
manager.retryStarted(TEST_OFFSET);
}
@@ -166,19 +168,19 @@ public class ExponentialBackoffMsgRetryManagerTest {
final long initial = 100;
final double mult = 2d;
final long max = 2000;
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, max);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, max, Integer.MAX_VALUE);
long expectedWaitTime = initial;
for (long i = 0L; i < 4L; ++i) {
manager.failed(TEST_OFFSET);
Thread.sleep((expectedWaitTime + 1L) / 2L);
- assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready for retry yet", manager.shouldReEmitMsg(TEST_OFFSET));
Thread.sleep((expectedWaitTime + 1L) / 2L);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
manager.retryStarted(TEST_OFFSET);
expectedWaitTime = Math.min((long) (expectedWaitTime * mult), max);
@@ -187,42 +189,42 @@ public class ExponentialBackoffMsgRetryManagerTest {
@Test
public void testFailThenAck() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
manager.acked(TEST_OFFSET);
Long next = manager.nextFailedMessageToRetry();
assertNull("expect no message ready after acked", next);
- assertFalse("message should not be ready after acked", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready after acked", manager.shouldReEmitMsg(TEST_OFFSET));
}
@Test
public void testAckThenFail() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.acked(TEST_OFFSET);
- assertFalse("message should not be ready after acked", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready after acked", manager.shouldReEmitMsg(TEST_OFFSET));
manager.failed(TEST_OFFSET);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
}
@Test
public void testClearInvalidMessages() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
manager.failed(TEST_OFFSET2);
manager.failed(TEST_OFFSET3);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2));
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET3));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET2));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET3));
- manager.clearInvalidMessages(TEST_NEW_OFFSET);
+ manager.clearOffsetsBefore(TEST_NEW_OFFSET);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET3, next);
@@ -232,4 +234,33 @@ public class ExponentialBackoffMsgRetryManagerTest {
assertNull("expect no message ready after acked", next);
}
+ @Test
+ public void testMaxRetry() throws Exception {
+ final long initial = 100;
+ final double mult = 2d;
+ final long max = 2000;
+ final int maxRetries = 2;
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, max, maxRetries);
+ assertTrue(manager.retryFurther(TEST_OFFSET));
+ manager.failed(TEST_OFFSET);
+
+ assertTrue(manager.retryFurther(TEST_OFFSET));
+ manager.failed(TEST_OFFSET);
+
+ assertFalse(manager.retryFurther(TEST_OFFSET));
+ }
+
+ private ExponentialBackoffMsgRetryManager buildExponentialBackoffMsgRetryManager(long retryInitialDelayMs,
+ double retryDelayMultiplier,
+ long retryDelayMaxMs,
+ int retryLimit) {
+ SpoutConfig spoutConfig = new SpoutConfig(null, null, null, null);
+ spoutConfig.retryInitialDelayMs = retryInitialDelayMs;
+ spoutConfig.retryDelayMultiplier = retryDelayMultiplier;
+ spoutConfig.retryDelayMaxMs = retryDelayMaxMs;
+ spoutConfig.retryLimit = retryLimit;
+ ExponentialBackoffMsgRetryManager exponentialBackoffMsgRetryManager = new ExponentialBackoffMsgRetryManager();
+ exponentialBackoffMsgRetryManager.prepare(spoutConfig);
+ return exponentialBackoffMsgRetryManager;
+ }
}
[10/10] storm git commit: add STORM-1705 to changelog
Posted by pt...@apache.org.
add STORM-1705 to changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/53e0dc53
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/53e0dc53
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/53e0dc53
Branch: refs/heads/1.0.x-branch
Commit: 53e0dc53945e68b856da5e62dcfd5327485b09e4
Parents: e3ad5ac
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Jun 10 10:38:39 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Jun 10 10:38:39 2016 -0400
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/53e0dc53/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ea77a2e..1530cec 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.0.2
+ * STORM-1705: Cap number of retries for a failed message
* STORM-1884: Prioritize pendingPrepare over pendingCommit
* STORM-1575: fix TwitterSampleSpout NPE on close
* STORM-1874: Update logger private permissions
[03/10] storm git commit: Merge branch 'cap-retry' of
https://github.com/abhishekagarwal87/storm into 1.x-branch
Posted by pt...@apache.org.
Merge branch 'cap-retry' of https://github.com/abhishekagarwal87/storm into 1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2d0fb215
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2d0fb215
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2d0fb215
Branch: refs/heads/1.x-branch
Commit: 2d0fb215bb101d81d6dfd2092705dd00d51ad4a4
Parents: 244b166 5c706af
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Jun 10 10:25:32 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Jun 10 10:25:32 2016 -0400
----------------------------------------------------------------------
external/storm-kafka/README.md | 55 ++++++++++-
.../ExponentialBackoffMsgRetryManager.java | 39 +++++---
.../storm/kafka/FailedMsgRetryManager.java | 60 ++++++++++--
.../apache/storm/kafka/PartitionManager.java | 33 +++++--
.../jvm/org/apache/storm/kafka/SpoutConfig.java | 8 +-
.../ExponentialBackoffMsgRetryManagerTest.java | 99 +++++++++++++-------
6 files changed, 229 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2d0fb215/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
[07/10] storm git commit: STORM-1705: Update storm-kafka readme
Posted by pt...@apache.org.
STORM-1705: Update storm-kafka readme
Signed-off-by: P. Taylor Goetz <pt...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b23febc3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b23febc3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b23febc3
Branch: refs/heads/master
Commit: b23febc39c4ba3f6f0e7eecfa0969cd958dc969a
Parents: fd51eac
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Thu Jun 9 18:36:49 2016 +0530
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Jun 10 10:36:15 2016 -0400
----------------------------------------------------------------------
external/storm-kafka/README.md | 55 ++++++++++++++++++--
.../ExponentialBackoffMsgRetryManager.java | 3 +-
.../storm/kafka/FailedMsgRetryManager.java | 3 +-
.../apache/storm/kafka/PartitionManager.java | 3 +-
.../jvm/org/apache/storm/kafka/SpoutConfig.java | 10 ++--
.../ExponentialBackoffMsgRetryManagerTest.java | 2 +-
6 files changed, 64 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b23febc3/external/storm-kafka/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 7fb2757..7db8675 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -62,13 +62,19 @@ In addition to these parameters, SpoutConfig contains the following fields that
// setting for how often to save the current Kafka offset to ZooKeeper
public long stateUpdateIntervalMs = 2000;
- // Exponential back-off retry settings. These are used when retrying messages after a bolt
- // calls OutputCollector.fail().
- // Note: be sure to set org.apache.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent
- // resubmitting the message while still retrying.
+ // Retry strategy for failed messages
+ public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
+
+ // Exponential back-off retry settings. These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt
+ // calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.
+ // Initial delay between successive retries
public long retryInitialDelayMs = 0;
public double retryDelayMultiplier = 1.0;
+
+ // Maximum delay between successive retries
public long retryDelayMaxMs = 60 * 1000;
+ // Failed message will be retried infinitely if retryLimit is less than zero.
+ public int retryLimit = -1;
```
Core KafkaSpout only accepts an instance of SpoutConfig.
@@ -112,6 +118,47 @@ public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message,
This is useful for auditing/replaying messages from arbitrary points on a Kafka topic, saving the partition and offset of each message of a discrete stream instead of persisting the entire message.
+###Failed message retry
+FailedMsgRetryManager is an interface which defines the retry strategy for a failed message. Default implementation is ExponentialBackoffMsgRetryManager which retries with exponential delays
+between consecutive retries. To use a custom implementation, set SpoutConfig.failedMsgRetryManagerClass to the full classname
+of implementation. Here is the interface
+
+```java
+ // Spout initialization can go here. This can be called multiple times during lifecycle of a worker.
+ void prepare(SpoutConfig spoutConfig, Map stormConf);
+
+ // Message corresponding to offset has failed. This method is called only if retryFurther returns true for offset.
+ void failed(Long offset);
+
+ // Message corresponding to offset has been acked.
+ void acked(Long offset);
+
+ // Message corresponding to the offset, has been re-emitted and under transit.
+ void retryStarted(Long offset);
+
+ /**
+ * The offset of message, which is to be re-emitted. Spout will fetch messages starting from this offset
+ * and resend them, except completed messages.
+ */
+ Long nextFailedMessageToRetry();
+
+ /**
+ * @return True if the message corresponding to the offset should be emitted NOW. False otherwise.
+ */
+ boolean shouldReEmitMsg(Long offset);
+
+ /**
+ * Spout will clean up the state for this offset if false is returned. If retryFurther is set to true,
+ * spout will called failed(offset) in next call and acked(offset) otherwise
+ */
+ boolean retryFurther(Long offset);
+
+ /**
+ * Clear any offsets before kafkaOffset. These offsets are no longer available in kafka.
+ */
+ Set<Long> clearOffsetsBefore(Long kafkaOffset);
+```
+
#### Version incompatibility
In Storm versions prior to 1.0, the MultiScheme methods accepted a `byte[]` instead of `ByteBuffer`. The `MultScheme` and the related
Scheme apis were changed in version 1.0 to accept a ByteBuffer instead of a byte[].
http://git-wip-us.apache.org/repos/asf/storm/blob/b23febc3/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
index 7b5f5dd..b2cfaf0 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -39,7 +39,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
}
- public void prepare(SpoutConfig spoutConfig) {
+ public void prepare(SpoutConfig spoutConfig, Map stormConf) {
this.retryInitialDelayMs = spoutConfig.retryInitialDelayMs;
this.retryDelayMultiplier = spoutConfig.retryDelayMultiplier;
this.retryDelayMaxMs = spoutConfig.retryDelayMaxMs;
@@ -105,6 +105,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
public boolean retryFurther(Long offset) {
MessageRetryRecord record = this.records.get(offset);
return ! (record != null &&
+ this.retryLimit > 0 &&
this.waiting.contains(record) &&
this.retryLimit <= record.retryNum);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b23febc3/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
index 9a3b19f..5e2cc5f 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
@@ -18,6 +18,7 @@
package org.apache.storm.kafka;
import java.io.Serializable;
+import java.util.Map;
import java.util.Set;
public interface FailedMsgRetryManager extends Serializable {
@@ -25,7 +26,7 @@ public interface FailedMsgRetryManager extends Serializable {
/**
* Initialization
*/
- void prepare(SpoutConfig spoutConfig);
+ void prepare(SpoutConfig spoutConfig, Map stormConf);
/**
* Message corresponding to the offset failed in kafka spout.
http://git-wip-us.apache.org/repos/asf/storm/blob/b23febc3/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index f42b8f2..c51f36e 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -78,7 +78,7 @@ public class PartitionManager {
try {
_failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
- _failedMsgRetryManager.prepare(spoutConfig);
+ _failedMsgRetryManager.prepare(spoutConfig, _stormConf);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>",
FailedMsgRetryManager.class,
@@ -288,6 +288,7 @@ public class PartitionManager {
this._failedMsgRetryManager.failed(offset);
} else {
// state for the offset should be cleaned up
+ LOG.warn("Will not retry failed kafka offset {} further", offset);
_messageIneligibleForRetryCount.incr();
_pending.remove(offset);
this._failedMsgRetryManager.acked(offset);
http://git-wip-us.apache.org/repos/asf/storm/blob/b23febc3/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
index 415ce0a..aa93c24 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
@@ -32,13 +32,15 @@ public class SpoutConfig extends KafkaConfig implements Serializable {
// setting for how often to save the current kafka offset to ZooKeeper
public long stateUpdateIntervalMs = 2000;
- // Exponential back-off retry settings. These are used when retrying messages after a bolt
- // calls OutputCollector.fail().
+ // Retry strategy for failed messages
+ public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
+
+ // Exponential back-off retry settings. These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt
+ // calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.
public long retryInitialDelayMs = 0;
public double retryDelayMultiplier = 1.0;
public long retryDelayMaxMs = 60 * 1000;
- public int retryLimit = Integer.MAX_VALUE;
- public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
+ public int retryLimit = -1;
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
super(hosts, topic);
http://git-wip-us.apache.org/repos/asf/storm/blob/b23febc3/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
index f2815e2..2f53612 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
@@ -260,7 +260,7 @@ public class ExponentialBackoffMsgRetryManagerTest {
spoutConfig.retryDelayMaxMs = retryDelayMaxMs;
spoutConfig.retryLimit = retryLimit;
ExponentialBackoffMsgRetryManager exponentialBackoffMsgRetryManager = new ExponentialBackoffMsgRetryManager();
- exponentialBackoffMsgRetryManager.prepare(spoutConfig);
+ exponentialBackoffMsgRetryManager.prepare(spoutConfig, null);
return exponentialBackoffMsgRetryManager;
}
}
[09/10] storm git commit: add STORM-1705 to changelog
Posted by pt...@apache.org.
add STORM-1705 to changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/478babe4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/478babe4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/478babe4
Branch: refs/heads/1.x-branch
Commit: 478babe4540c00fc68275faad8c2806851a2ccdd
Parents: 2d0fb21
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Jun 10 10:38:20 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Jun 10 10:38:20 2016 -0400
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/478babe4/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c7b2226..e6830d9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.0
+ * STORM-1705: Cap number of retries for a failed message
* STORM-1884: Prioritize pendingPrepare over pendingCommit
* STORM-1575: fix TwitterSampleSpout NPE on close
* STORM-1874: Update logger private permissions
[08/10] storm git commit: add STORM-1705 to changelog
Posted by pt...@apache.org.
add STORM-1705 to changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/58241895
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/58241895
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/58241895
Branch: refs/heads/master
Commit: 582418952d130228789c6e523ffc1190fd5b4d20
Parents: b23febc
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Jun 10 10:37:40 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Jun 10 10:37:40 2016 -0400
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/58241895/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f194cee..0b44fd1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-1705: Cap number of retries for a failed message
* STORM-1884: Prioritize pendingPrepare over pendingCommit
* STORM-1575: fix TwitterSampleSpout NPE on close
* STORM-1874: Update logger private permissions
[02/10] storm git commit: STORM-1705: Update storm-kafka readme
Posted by pt...@apache.org.
STORM-1705: Update storm-kafka readme
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5c706afc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5c706afc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5c706afc
Branch: refs/heads/1.x-branch
Commit: 5c706afcb0922b891f1eda0f5483c89272491d12
Parents: 0f1aec5
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Thu Jun 9 18:36:49 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Thu Jun 9 18:36:49 2016 +0530
----------------------------------------------------------------------
external/storm-kafka/README.md | 55 ++++++++++++++++++--
.../ExponentialBackoffMsgRetryManager.java | 3 +-
.../storm/kafka/FailedMsgRetryManager.java | 3 +-
.../apache/storm/kafka/PartitionManager.java | 3 +-
.../jvm/org/apache/storm/kafka/SpoutConfig.java | 10 ++--
.../ExponentialBackoffMsgRetryManagerTest.java | 2 +-
6 files changed, 64 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/5c706afc/external/storm-kafka/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 5a34b55..91a6553 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -63,13 +63,19 @@ In addition to these parameters, SpoutConfig contains the following fields that
// setting for how often to save the current Kafka offset to ZooKeeper
public long stateUpdateIntervalMs = 2000;
- // Exponential back-off retry settings. These are used when retrying messages after a bolt
- // calls OutputCollector.fail().
- // Note: be sure to set org.apache.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent
- // resubmitting the message while still retrying.
+ // Retry strategy for failed messages
+ public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
+
+ // Exponential back-off retry settings. These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt
+ // calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.
+ // Initial delay between successive retries
public long retryInitialDelayMs = 0;
public double retryDelayMultiplier = 1.0;
+
+ // Maximum delay between successive retries
public long retryDelayMaxMs = 60 * 1000;
+ // Failed message will be retried infinitely if retryLimit is less than zero.
+ public int retryLimit = -1;
```
Core KafkaSpout only accepts an instance of SpoutConfig.
@@ -113,6 +119,47 @@ public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message,
This is useful for auditing/replaying messages from arbitrary points on a Kafka topic, saving the partition and offset of each message of a discrete stream instead of persisting the entire message.
+###Failed message retry
+FailedMsgRetryManager is an interface which defines the retry strategy for a failed message. Default implementation is ExponentialBackoffMsgRetryManager which retries with exponential delays
+between consecutive retries. To use a custom implementation, set SpoutConfig.failedMsgRetryManagerClass to the full classname
+of implementation. Here is the interface
+
+```java
+ // Spout initialization can go here. This can be called multiple times during lifecycle of a worker.
+ void prepare(SpoutConfig spoutConfig, Map stormConf);
+
+ // Message corresponding to offset has failed. This method is called only if retryFurther returns true for offset.
+ void failed(Long offset);
+
+ // Message corresponding to offset has been acked.
+ void acked(Long offset);
+
+ // Message corresponding to the offset, has been re-emitted and under transit.
+ void retryStarted(Long offset);
+
+ /**
+ * The offset of message, which is to be re-emitted. Spout will fetch messages starting from this offset
+ * and resend them, except completed messages.
+ */
+ Long nextFailedMessageToRetry();
+
+ /**
+ * @return True if the message corresponding to the offset should be emitted NOW. False otherwise.
+ */
+ boolean shouldReEmitMsg(Long offset);
+
+ /**
+ * Spout will clean up the state for this offset if false is returned. If retryFurther is set to true,
+ * spout will called failed(offset) in next call and acked(offset) otherwise
+ */
+ boolean retryFurther(Long offset);
+
+ /**
+ * Clear any offsets before kafkaOffset. These offsets are no longer available in kafka.
+ */
+ Set<Long> clearOffsetsBefore(Long kafkaOffset);
+```
+
#### Version incompatibility
In Storm versions prior to 1.0, the MultiScheme methods accepted a `byte[]` instead of `ByteBuffer`. The `MultScheme` and the related
Scheme apis were changed in version 1.0 to accept a ByteBuffer instead of a byte[].
http://git-wip-us.apache.org/repos/asf/storm/blob/5c706afc/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
index 7b5f5dd..b2cfaf0 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -39,7 +39,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
}
- public void prepare(SpoutConfig spoutConfig) {
+ public void prepare(SpoutConfig spoutConfig, Map stormConf) {
this.retryInitialDelayMs = spoutConfig.retryInitialDelayMs;
this.retryDelayMultiplier = spoutConfig.retryDelayMultiplier;
this.retryDelayMaxMs = spoutConfig.retryDelayMaxMs;
@@ -105,6 +105,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
public boolean retryFurther(Long offset) {
MessageRetryRecord record = this.records.get(offset);
return ! (record != null &&
+ this.retryLimit > 0 &&
this.waiting.contains(record) &&
this.retryLimit <= record.retryNum);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/5c706afc/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
index 9a3b19f..5e2cc5f 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
@@ -18,6 +18,7 @@
package org.apache.storm.kafka;
import java.io.Serializable;
+import java.util.Map;
import java.util.Set;
public interface FailedMsgRetryManager extends Serializable {
@@ -25,7 +26,7 @@ public interface FailedMsgRetryManager extends Serializable {
/**
* Initialization
*/
- void prepare(SpoutConfig spoutConfig);
+ void prepare(SpoutConfig spoutConfig, Map stormConf);
/**
* Message corresponding to the offset failed in kafka spout.
http://git-wip-us.apache.org/repos/asf/storm/blob/5c706afc/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index d6e51b7..b65f053 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -77,7 +77,7 @@ public class PartitionManager {
try {
_failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
- _failedMsgRetryManager.prepare(spoutConfig);
+ _failedMsgRetryManager.prepare(spoutConfig, _stormConf);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>",
FailedMsgRetryManager.class,
@@ -284,6 +284,7 @@ public class PartitionManager {
this._failedMsgRetryManager.failed(offset);
} else {
// state for the offset should be cleaned up
+ LOG.warn("Will not retry failed kafka offset {} further", offset);
_messageIneligibleForRetryCount.incr();
_pending.remove(offset);
this._failedMsgRetryManager.acked(offset);
http://git-wip-us.apache.org/repos/asf/storm/blob/5c706afc/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
index 415ce0a..aa93c24 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
@@ -32,13 +32,15 @@ public class SpoutConfig extends KafkaConfig implements Serializable {
// setting for how often to save the current kafka offset to ZooKeeper
public long stateUpdateIntervalMs = 2000;
- // Exponential back-off retry settings. These are used when retrying messages after a bolt
- // calls OutputCollector.fail().
+ // Retry strategy for failed messages
+ public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
+
+ // Exponential back-off retry settings. These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt
+ // calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.
public long retryInitialDelayMs = 0;
public double retryDelayMultiplier = 1.0;
public long retryDelayMaxMs = 60 * 1000;
- public int retryLimit = Integer.MAX_VALUE;
- public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
+ public int retryLimit = -1;
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
super(hosts, topic);
http://git-wip-us.apache.org/repos/asf/storm/blob/5c706afc/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
index f2815e2..2f53612 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
@@ -260,7 +260,7 @@ public class ExponentialBackoffMsgRetryManagerTest {
spoutConfig.retryDelayMaxMs = retryDelayMaxMs;
spoutConfig.retryLimit = retryLimit;
ExponentialBackoffMsgRetryManager exponentialBackoffMsgRetryManager = new ExponentialBackoffMsgRetryManager();
- exponentialBackoffMsgRetryManager.prepare(spoutConfig);
+ exponentialBackoffMsgRetryManager.prepare(spoutConfig, null);
return exponentialBackoffMsgRetryManager;
}
}
[05/10] storm git commit: STORM-1705: Update storm-kafka readme
Posted by pt...@apache.org.
STORM-1705: Update storm-kafka readme
Signed-off-by: P. Taylor Goetz <pt...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e3ad5acb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e3ad5acb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e3ad5acb
Branch: refs/heads/1.0.x-branch
Commit: e3ad5acbc80b1060f73d41a77ca91c2d7efdc069
Parents: 567683f
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Thu Jun 9 18:36:49 2016 +0530
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Jun 10 10:35:43 2016 -0400
----------------------------------------------------------------------
external/storm-kafka/README.md | 55 ++++++++++++++++++--
.../ExponentialBackoffMsgRetryManager.java | 3 +-
.../storm/kafka/FailedMsgRetryManager.java | 3 +-
.../apache/storm/kafka/PartitionManager.java | 3 +-
.../jvm/org/apache/storm/kafka/SpoutConfig.java | 10 ++--
.../ExponentialBackoffMsgRetryManagerTest.java | 2 +-
6 files changed, 64 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e3ad5acb/external/storm-kafka/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 5a34b55..91a6553 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -63,13 +63,19 @@ In addition to these parameters, SpoutConfig contains the following fields that
// setting for how often to save the current Kafka offset to ZooKeeper
public long stateUpdateIntervalMs = 2000;
- // Exponential back-off retry settings. These are used when retrying messages after a bolt
- // calls OutputCollector.fail().
- // Note: be sure to set org.apache.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent
- // resubmitting the message while still retrying.
+ // Retry strategy for failed messages
+ public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
+
+ // Exponential back-off retry settings. These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt
+ // calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.
+ // Initial delay between successive retries
public long retryInitialDelayMs = 0;
public double retryDelayMultiplier = 1.0;
+
+ // Maximum delay between successive retries
public long retryDelayMaxMs = 60 * 1000;
+ // Failed message will be retried infinitely if retryLimit is less than zero.
+ public int retryLimit = -1;
```
Core KafkaSpout only accepts an instance of SpoutConfig.
@@ -113,6 +119,47 @@ public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message,
This is useful for auditing/replaying messages from arbitrary points on a Kafka topic, saving the partition and offset of each message of a discrete stream instead of persisting the entire message.
+###Failed message retry
+FailedMsgRetryManager is an interface which defines the retry strategy for a failed message. Default implementation is ExponentialBackoffMsgRetryManager which retries with exponential delays
+between consecutive retries. To use a custom implementation, set SpoutConfig.failedMsgRetryManagerClass to the full classname
+of implementation. Here is the interface
+
+```java
+ // Spout initialization can go here. This can be called multiple times during lifecycle of a worker.
+ void prepare(SpoutConfig spoutConfig, Map stormConf);
+
+ // Message corresponding to offset has failed. This method is called only if retryFurther returns true for offset.
+ void failed(Long offset);
+
+ // Message corresponding to offset has been acked.
+ void acked(Long offset);
+
+ // Message corresponding to the offset, has been re-emitted and under transit.
+ void retryStarted(Long offset);
+
+ /**
+ * The offset of message, which is to be re-emitted. Spout will fetch messages starting from this offset
+ * and resend them, except completed messages.
+ */
+ Long nextFailedMessageToRetry();
+
+ /**
+ * @return True if the message corresponding to the offset should be emitted NOW. False otherwise.
+ */
+ boolean shouldReEmitMsg(Long offset);
+
+ /**
+ * Spout will clean up the state for this offset if false is returned. If retryFurther is set to true,
+ * spout will called failed(offset) in next call and acked(offset) otherwise
+ */
+ boolean retryFurther(Long offset);
+
+ /**
+ * Clear any offsets before kafkaOffset. These offsets are no longer available in kafka.
+ */
+ Set<Long> clearOffsetsBefore(Long kafkaOffset);
+```
+
#### Version incompatibility
In Storm versions prior to 1.0, the MultiScheme methods accepted a `byte[]` instead of `ByteBuffer`. The `MultScheme` and the related
Scheme apis were changed in version 1.0 to accept a ByteBuffer instead of a byte[].
http://git-wip-us.apache.org/repos/asf/storm/blob/e3ad5acb/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
index 7b5f5dd..b2cfaf0 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -39,7 +39,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
}
- public void prepare(SpoutConfig spoutConfig) {
+ public void prepare(SpoutConfig spoutConfig, Map stormConf) {
this.retryInitialDelayMs = spoutConfig.retryInitialDelayMs;
this.retryDelayMultiplier = spoutConfig.retryDelayMultiplier;
this.retryDelayMaxMs = spoutConfig.retryDelayMaxMs;
@@ -105,6 +105,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
public boolean retryFurther(Long offset) {
MessageRetryRecord record = this.records.get(offset);
return ! (record != null &&
+ this.retryLimit > 0 &&
this.waiting.contains(record) &&
this.retryLimit <= record.retryNum);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e3ad5acb/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
index 9a3b19f..5e2cc5f 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
@@ -18,6 +18,7 @@
package org.apache.storm.kafka;
import java.io.Serializable;
+import java.util.Map;
import java.util.Set;
public interface FailedMsgRetryManager extends Serializable {
@@ -25,7 +26,7 @@ public interface FailedMsgRetryManager extends Serializable {
/**
* Initialization
*/
- void prepare(SpoutConfig spoutConfig);
+ void prepare(SpoutConfig spoutConfig, Map stormConf);
/**
* Message corresponding to the offset failed in kafka spout.
http://git-wip-us.apache.org/repos/asf/storm/blob/e3ad5acb/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index 16bdccf..e04b4f2 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -78,7 +78,7 @@ public class PartitionManager {
try {
_failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
- _failedMsgRetryManager.prepare(spoutConfig);
+ _failedMsgRetryManager.prepare(spoutConfig, _stormConf);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>",
FailedMsgRetryManager.class,
@@ -289,6 +289,7 @@ public class PartitionManager {
this._failedMsgRetryManager.failed(offset);
} else {
// state for the offset should be cleaned up
+ LOG.warn("Will not retry failed kafka offset {} further", offset);
_messageIneligibleForRetryCount.incr();
_pending.remove(offset);
this._failedMsgRetryManager.acked(offset);
http://git-wip-us.apache.org/repos/asf/storm/blob/e3ad5acb/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
index 415ce0a..aa93c24 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
@@ -32,13 +32,15 @@ public class SpoutConfig extends KafkaConfig implements Serializable {
// setting for how often to save the current kafka offset to ZooKeeper
public long stateUpdateIntervalMs = 2000;
- // Exponential back-off retry settings. These are used when retrying messages after a bolt
- // calls OutputCollector.fail().
+ // Retry strategy for failed messages
+ public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
+
+ // Exponential back-off retry settings. These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt
+ // calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.
public long retryInitialDelayMs = 0;
public double retryDelayMultiplier = 1.0;
public long retryDelayMaxMs = 60 * 1000;
- public int retryLimit = Integer.MAX_VALUE;
- public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
+ public int retryLimit = -1;
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
super(hosts, topic);
http://git-wip-us.apache.org/repos/asf/storm/blob/e3ad5acb/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
index f2815e2..2f53612 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
@@ -260,7 +260,7 @@ public class ExponentialBackoffMsgRetryManagerTest {
spoutConfig.retryDelayMaxMs = retryDelayMaxMs;
spoutConfig.retryLimit = retryLimit;
ExponentialBackoffMsgRetryManager exponentialBackoffMsgRetryManager = new ExponentialBackoffMsgRetryManager();
- exponentialBackoffMsgRetryManager.prepare(spoutConfig);
+ exponentialBackoffMsgRetryManager.prepare(spoutConfig, null);
return exponentialBackoffMsgRetryManager;
}
}
[04/10] storm git commit: STORM-1705: Cap number of retries for a
failed message
Posted by pt...@apache.org.
STORM-1705: Cap number of retries for a failed message
Signed-off-by: P. Taylor Goetz <pt...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/567683f3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/567683f3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/567683f3
Branch: refs/heads/1.0.x-branch
Commit: 567683f3ac44af23afc6ab0b7a9877ec82957618
Parents: 0bfa2bc
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Tue Apr 12 15:28:06 2016 +0530
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Jun 10 10:35:43 2016 -0400
----------------------------------------------------------------------
.../ExponentialBackoffMsgRetryManager.java | 38 +++++---
.../storm/kafka/FailedMsgRetryManager.java | 59 ++++++++++--
.../apache/storm/kafka/PartitionManager.java | 32 +++++--
.../jvm/org/apache/storm/kafka/SpoutConfig.java | 2 +
.../ExponentialBackoffMsgRetryManagerTest.java | 99 +++++++++++++-------
5 files changed, 171 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/567683f3/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
index f86d624..7b5f5dd 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -27,17 +27,25 @@ import java.util.concurrent.ConcurrentHashMap;
public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager {
- private final long retryInitialDelayMs;
- private final double retryDelayMultiplier;
- private final long retryDelayMaxMs;
+ private long retryInitialDelayMs;
+ private double retryDelayMultiplier;
+ private long retryDelayMaxMs;
+ private int retryLimit;
- private Queue<MessageRetryRecord> waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
- private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>();
+ private Queue<MessageRetryRecord> waiting;
+ private Map<Long,MessageRetryRecord> records;
- public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double retryDelayMultiplier, long retryDelayMaxMs) {
- this.retryInitialDelayMs = retryInitialDelayMs;
- this.retryDelayMultiplier = retryDelayMultiplier;
- this.retryDelayMaxMs = retryDelayMaxMs;
+ public ExponentialBackoffMsgRetryManager() {
+
+ }
+
+ public void prepare(SpoutConfig spoutConfig) {
+ this.retryInitialDelayMs = spoutConfig.retryInitialDelayMs;
+ this.retryDelayMultiplier = spoutConfig.retryDelayMultiplier;
+ this.retryDelayMaxMs = spoutConfig.retryDelayMaxMs;
+ this.retryLimit = spoutConfig.retryLimit;
+ this.waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
+ this.records = new ConcurrentHashMap<Long,MessageRetryRecord>();
}
@Override
@@ -86,7 +94,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
}
@Override
- public boolean shouldRetryMsg(Long offset) {
+ public boolean shouldReEmitMsg(Long offset) {
MessageRetryRecord record = this.records.get(offset);
return record != null &&
this.waiting.contains(record) &&
@@ -94,7 +102,15 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
}
@Override
- public Set<Long> clearInvalidMessages(Long kafkaOffset) {
+ public boolean retryFurther(Long offset) {
+ MessageRetryRecord record = this.records.get(offset);
+ return ! (record != null &&
+ this.waiting.contains(record) &&
+ this.retryLimit <= record.retryNum);
+ }
+
+ @Override
+ public Set<Long> clearOffsetsBefore(Long kafkaOffset) {
Set<Long> invalidOffsets = new HashSet<Long>();
for(Long offset : records.keySet()){
if(offset < kafkaOffset){
http://git-wip-us.apache.org/repos/asf/storm/blob/567683f3/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
index e9a7092..9a3b19f 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
@@ -17,13 +17,58 @@
*/
package org.apache.storm.kafka;
+import java.io.Serializable;
import java.util.Set;
-public interface FailedMsgRetryManager {
- public void failed(Long offset);
- public void acked(Long offset);
- public void retryStarted(Long offset);
- public Long nextFailedMessageToRetry();
- public boolean shouldRetryMsg(Long offset);
- public Set<Long> clearInvalidMessages(Long kafkaOffset);
+public interface FailedMsgRetryManager extends Serializable {
+
+ /**
+ * Initialization
+ */
+ void prepare(SpoutConfig spoutConfig);
+
+ /**
+ * Message corresponding to the offset failed in kafka spout.
+ * @param offset
+ */
+ void failed(Long offset);
+
+ /**
+ * Message corresponding to the offset, was acked to kafka spout.
+ * @param offset
+ */
+ void acked(Long offset);
+
+ /**
+ * Message corresponding to the offset, has been re-emitted and under transit.
+ * @param offset
+ */
+ void retryStarted(Long offset);
+
+ /**
+ * The offset of message, which is to be re-emitted. Spout will fetch messages starting from this offset
+ * and resend them, except completed messages.
+ * @return
+ */
+ Long nextFailedMessageToRetry();
+
+ /**
+ * @param offset
+ * @return True if the message corresponding to the offset should be emitted NOW. False otherwise.
+ */
+ boolean shouldReEmitMsg(Long offset);
+
+ /**
+ * Spout will clean up the state for this offset if false is returned.
+ * @param offset
+ * @return True if the message will be retried again. False otherwise.
+ */
+ boolean retryFurther(Long offset);
+
+ /**
+ * Clear any offsets before kafkaOffset. These offsets are no longer available in kafka.
+ * @param kafkaOffset
+ * @return Set of offsets removed.
+ */
+ Set<Long> clearOffsetsBefore(Long kafkaOffset);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/567683f3/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index f9d8cf6..16bdccf 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -47,6 +47,9 @@ public class PartitionManager {
private final CountMetric _fetchAPIMessageCount;
// Count of messages which could not be emitted or retried because they were deleted from kafka
private final CountMetric _lostMessageCount;
+ // Count of messages which were not retried because failedMsgRetryManager didn't consider offset eligible for
+ // retry
+ private final CountMetric _messageIneligibleForRetryCount;
Long _emittedToOffset;
// _pending key = Kafka offset, value = time at which the message was first submitted to the topology
private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>();
@@ -73,9 +76,14 @@ public class PartitionManager {
_stormConf = stormConf;
numberAcked = numberFailed = 0;
- _failedMsgRetryManager = new ExponentialBackoffMsgRetryManager(_spoutConfig.retryInitialDelayMs,
- _spoutConfig.retryDelayMultiplier,
- _spoutConfig.retryDelayMaxMs);
+ try {
+ _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
+ _failedMsgRetryManager.prepare(spoutConfig);
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+ throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>",
+ FailedMsgRetryManager.class,
+ spoutConfig.failedMsgRetryManagerClass), e);
+ }
String jsonTopologyId = null;
Long jsonOffset = null;
@@ -121,6 +129,7 @@ public class PartitionManager {
_fetchAPICallCount = new CountMetric();
_fetchAPIMessageCount = new CountMetric();
_lostMessageCount = new CountMetric();
+ _messageIneligibleForRetryCount = new CountMetric();
}
public Map getMetricsDataMap() {
@@ -130,6 +139,7 @@ public class PartitionManager {
ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset());
ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset());
ret.put(_partition + "/lostMessageCount", _lostMessageCount.getValueAndReset());
+ ret.put(_partition + "/messageIneligibleForRetryCount", _messageIneligibleForRetryCount.getValueAndReset());
return ret;
}
@@ -198,7 +208,7 @@ public class PartitionManager {
// all the failed offsets, that are earlier than actual EarliestTime
// offset, since they are anyway not there.
// These calls to broker API will be then saved.
- Set<Long> omitted = this._failedMsgRetryManager.clearInvalidMessages(offset);
+ Set<Long> omitted = this._failedMsgRetryManager.clearOffsetsBefore(offset);
// Omitted messages have not been acked and may be lost
if (null != omitted) {
@@ -230,14 +240,14 @@ public class PartitionManager {
// Skip any old offsets.
continue;
}
- if (processingNewTuples || this._failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
+ if (processingNewTuples || this._failedMsgRetryManager.shouldReEmitMsg(cur_offset)) {
numMessages += 1;
if (!_pending.containsKey(cur_offset)) {
_pending.put(cur_offset, System.currentTimeMillis());
}
_waitingToEmit.add(msg);
_emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
- if (_failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
+ if (_failedMsgRetryManager.shouldReEmitMsg(cur_offset)) {
this._failedMsgRetryManager.retryStarted(cur_offset);
}
}
@@ -274,7 +284,15 @@ public class PartitionManager {
throw new RuntimeException("Too many tuple failures");
}
- this._failedMsgRetryManager.failed(offset);
+ // Offset may not be considered for retry by failedMsgRetryManager
+ if (this._failedMsgRetryManager.retryFurther(offset)) {
+ this._failedMsgRetryManager.failed(offset);
+ } else {
+ // state for the offset should be cleaned up
+ _messageIneligibleForRetryCount.incr();
+ _pending.remove(offset);
+ this._failedMsgRetryManager.acked(offset);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/567683f3/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
index 1ac41c8..415ce0a 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
@@ -37,6 +37,8 @@ public class SpoutConfig extends KafkaConfig implements Serializable {
public long retryInitialDelayMs = 0;
public double retryDelayMultiplier = 1.0;
public long retryDelayMaxMs = 60 * 1000;
+ public int retryLimit = Integer.MAX_VALUE;
+ public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
super(hosts, topic);
http://git-wip-us.apache.org/repos/asf/storm/blob/567683f3/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
index 8fa6564..f2815e2 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
@@ -17,14 +17,14 @@
*/
package org.apache.storm.kafka;
+import org.junit.Test;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import org.junit.Test;
-
public class ExponentialBackoffMsgRetryManagerTest {
private static final Long TEST_OFFSET = 101L;
@@ -34,52 +34,54 @@ public class ExponentialBackoffMsgRetryManagerTest {
@Test
public void testImmediateRetry() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+
+
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry immediately", manager.shouldReEmitMsg(TEST_OFFSET));
manager.retryStarted(TEST_OFFSET);
manager.failed(TEST_OFFSET);
next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry immediately", manager.shouldReEmitMsg(TEST_OFFSET));
}
@Test
public void testSingleDelay() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(100, 1d, 1000);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(100, 1d, 1000, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
Thread.sleep(5);
Long next = manager.nextFailedMessageToRetry();
assertNull("expect no message ready for retry yet", next);
- assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready for retry yet", manager.shouldReEmitMsg(TEST_OFFSET));
Thread.sleep(100);
next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
}
@Test
public void testExponentialBackoff() throws Exception {
final long initial = 10;
final double mult = 2d;
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, initial * 10);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, initial * 10, Integer.MAX_VALUE);
long expectedWaitTime = initial;
for (long i = 0L; i < 3L; ++i) {
manager.failed(TEST_OFFSET);
Thread.sleep((expectedWaitTime + 1L) / 2L);
- assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready for retry yet", manager.shouldReEmitMsg(TEST_OFFSET));
Thread.sleep((expectedWaitTime + 1L) / 2L);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
manager.retryStarted(TEST_OFFSET);
expectedWaitTime *= mult;
@@ -91,7 +93,7 @@ public class ExponentialBackoffMsgRetryManagerTest {
final long initial = 10;
final double mult = 2d;
final long max = 20;
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, max);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, max, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
Thread.sleep(initial);
@@ -104,8 +106,8 @@ public class ExponentialBackoffMsgRetryManagerTest {
// so TEST_OFFSET2 should come first
Thread.sleep(initial * 2);
- assertTrue("message "+TEST_OFFSET+"should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
- assertTrue("message "+TEST_OFFSET2+"should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2));
+ assertTrue("message "+TEST_OFFSET+"should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
+ assertTrue("message "+TEST_OFFSET2+"should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET2));
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect first message to retry is "+TEST_OFFSET2, TEST_OFFSET2, next);
@@ -129,27 +131,27 @@ public class ExponentialBackoffMsgRetryManagerTest {
@Test
public void testQueriesAfterRetriedAlready() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry immediately", manager.shouldReEmitMsg(TEST_OFFSET));
manager.retryStarted(TEST_OFFSET);
next = manager.nextFailedMessageToRetry();
assertNull("expect no message ready after retried", next);
- assertFalse("message should not be ready after retried", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready after retried", manager.shouldReEmitMsg(TEST_OFFSET));
}
@Test(expected = IllegalStateException.class)
public void testRetryWithoutFail() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.retryStarted(TEST_OFFSET);
}
@Test(expected = IllegalStateException.class)
public void testFailRetryRetry() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
try {
manager.retryStarted(TEST_OFFSET);
@@ -157,7 +159,7 @@ public class ExponentialBackoffMsgRetryManagerTest {
fail("IllegalStateException unexpected here: " + ise);
}
- assertFalse("message should not be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
manager.retryStarted(TEST_OFFSET);
}
@@ -166,19 +168,19 @@ public class ExponentialBackoffMsgRetryManagerTest {
final long initial = 100;
final double mult = 2d;
final long max = 2000;
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, max);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, max, Integer.MAX_VALUE);
long expectedWaitTime = initial;
for (long i = 0L; i < 4L; ++i) {
manager.failed(TEST_OFFSET);
Thread.sleep((expectedWaitTime + 1L) / 2L);
- assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready for retry yet", manager.shouldReEmitMsg(TEST_OFFSET));
Thread.sleep((expectedWaitTime + 1L) / 2L);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
manager.retryStarted(TEST_OFFSET);
expectedWaitTime = Math.min((long) (expectedWaitTime * mult), max);
@@ -187,42 +189,42 @@ public class ExponentialBackoffMsgRetryManagerTest {
@Test
public void testFailThenAck() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
manager.acked(TEST_OFFSET);
Long next = manager.nextFailedMessageToRetry();
assertNull("expect no message ready after acked", next);
- assertFalse("message should not be ready after acked", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready after acked", manager.shouldReEmitMsg(TEST_OFFSET));
}
@Test
public void testAckThenFail() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.acked(TEST_OFFSET);
- assertFalse("message should not be ready after acked", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready after acked", manager.shouldReEmitMsg(TEST_OFFSET));
manager.failed(TEST_OFFSET);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
}
@Test
public void testClearInvalidMessages() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
manager.failed(TEST_OFFSET2);
manager.failed(TEST_OFFSET3);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2));
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET3));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET2));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET3));
- manager.clearInvalidMessages(TEST_NEW_OFFSET);
+ manager.clearOffsetsBefore(TEST_NEW_OFFSET);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET3, next);
@@ -232,4 +234,33 @@ public class ExponentialBackoffMsgRetryManagerTest {
assertNull("expect no message ready after acked", next);
}
+ @Test
+ public void testMaxRetry() throws Exception {
+ final long initial = 100;
+ final double mult = 2d;
+ final long max = 2000;
+ final int maxRetries = 2;
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, max, maxRetries);
+ assertTrue(manager.retryFurther(TEST_OFFSET));
+ manager.failed(TEST_OFFSET);
+
+ assertTrue(manager.retryFurther(TEST_OFFSET));
+ manager.failed(TEST_OFFSET);
+
+ assertFalse(manager.retryFurther(TEST_OFFSET));
+ }
+
+ private ExponentialBackoffMsgRetryManager buildExponentialBackoffMsgRetryManager(long retryInitialDelayMs,
+ double retryDelayMultiplier,
+ long retryDelayMaxMs,
+ int retryLimit) {
+ SpoutConfig spoutConfig = new SpoutConfig(null, null, null, null);
+ spoutConfig.retryInitialDelayMs = retryInitialDelayMs;
+ spoutConfig.retryDelayMultiplier = retryDelayMultiplier;
+ spoutConfig.retryDelayMaxMs = retryDelayMaxMs;
+ spoutConfig.retryLimit = retryLimit;
+ ExponentialBackoffMsgRetryManager exponentialBackoffMsgRetryManager = new ExponentialBackoffMsgRetryManager();
+ exponentialBackoffMsgRetryManager.prepare(spoutConfig);
+ return exponentialBackoffMsgRetryManager;
+ }
}
[06/10] storm git commit: STORM-1705: Cap number of retries for a
failed message
Posted by pt...@apache.org.
STORM-1705: Cap number of retries for a failed message
Signed-off-by: P. Taylor Goetz <pt...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fd51eacb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fd51eacb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fd51eacb
Branch: refs/heads/master
Commit: fd51eacb58c52d7ed35ecbf52b7e6a9509fe1a7f
Parents: 0b54767
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Tue Apr 12 15:28:06 2016 +0530
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Jun 10 10:36:15 2016 -0400
----------------------------------------------------------------------
.../ExponentialBackoffMsgRetryManager.java | 38 +++++---
.../storm/kafka/FailedMsgRetryManager.java | 59 ++++++++++--
.../apache/storm/kafka/PartitionManager.java | 32 +++++--
.../jvm/org/apache/storm/kafka/SpoutConfig.java | 2 +
.../ExponentialBackoffMsgRetryManagerTest.java | 99 +++++++++++++-------
5 files changed, 171 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fd51eacb/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
index f86d624..7b5f5dd 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -27,17 +27,25 @@ import java.util.concurrent.ConcurrentHashMap;
public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager {
- private final long retryInitialDelayMs;
- private final double retryDelayMultiplier;
- private final long retryDelayMaxMs;
+ private long retryInitialDelayMs;
+ private double retryDelayMultiplier;
+ private long retryDelayMaxMs;
+ private int retryLimit;
- private Queue<MessageRetryRecord> waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
- private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>();
+ private Queue<MessageRetryRecord> waiting;
+ private Map<Long,MessageRetryRecord> records;
- public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double retryDelayMultiplier, long retryDelayMaxMs) {
- this.retryInitialDelayMs = retryInitialDelayMs;
- this.retryDelayMultiplier = retryDelayMultiplier;
- this.retryDelayMaxMs = retryDelayMaxMs;
+ public ExponentialBackoffMsgRetryManager() {
+
+ }
+
+ public void prepare(SpoutConfig spoutConfig) {
+ this.retryInitialDelayMs = spoutConfig.retryInitialDelayMs;
+ this.retryDelayMultiplier = spoutConfig.retryDelayMultiplier;
+ this.retryDelayMaxMs = spoutConfig.retryDelayMaxMs;
+ this.retryLimit = spoutConfig.retryLimit;
+ this.waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
+ this.records = new ConcurrentHashMap<Long,MessageRetryRecord>();
}
@Override
@@ -86,7 +94,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
}
@Override
- public boolean shouldRetryMsg(Long offset) {
+ public boolean shouldReEmitMsg(Long offset) {
MessageRetryRecord record = this.records.get(offset);
return record != null &&
this.waiting.contains(record) &&
@@ -94,7 +102,15 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
}
@Override
- public Set<Long> clearInvalidMessages(Long kafkaOffset) {
+ public boolean retryFurther(Long offset) {
+ MessageRetryRecord record = this.records.get(offset);
+ return ! (record != null &&
+ this.waiting.contains(record) &&
+ this.retryLimit <= record.retryNum);
+ }
+
+ @Override
+ public Set<Long> clearOffsetsBefore(Long kafkaOffset) {
Set<Long> invalidOffsets = new HashSet<Long>();
for(Long offset : records.keySet()){
if(offset < kafkaOffset){
http://git-wip-us.apache.org/repos/asf/storm/blob/fd51eacb/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
index e9a7092..9a3b19f 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
@@ -17,13 +17,58 @@
*/
package org.apache.storm.kafka;
+import java.io.Serializable;
import java.util.Set;
-public interface FailedMsgRetryManager {
- public void failed(Long offset);
- public void acked(Long offset);
- public void retryStarted(Long offset);
- public Long nextFailedMessageToRetry();
- public boolean shouldRetryMsg(Long offset);
- public Set<Long> clearInvalidMessages(Long kafkaOffset);
+public interface FailedMsgRetryManager extends Serializable {
+
+ /**
+ * Initialization
+ */
+ void prepare(SpoutConfig spoutConfig);
+
+ /**
+ * Message corresponding to the offset failed in kafka spout.
+ * @param offset
+ */
+ void failed(Long offset);
+
+ /**
+ * Message corresponding to the offset, was acked to kafka spout.
+ * @param offset
+ */
+ void acked(Long offset);
+
+ /**
+ * Message corresponding to the offset, has been re-emitted and under transit.
+ * @param offset
+ */
+ void retryStarted(Long offset);
+
+ /**
+ * The offset of message, which is to be re-emitted. Spout will fetch messages starting from this offset
+ * and resend them, except completed messages.
+ * @return
+ */
+ Long nextFailedMessageToRetry();
+
+ /**
+ * @param offset
+ * @return True if the message corresponding to the offset should be emitted NOW. False otherwise.
+ */
+ boolean shouldReEmitMsg(Long offset);
+
+ /**
+ * Spout will clean up the state for this offset if false is returned.
+ * @param offset
+ * @return True if the message will be retried again. False otherwise.
+ */
+ boolean retryFurther(Long offset);
+
+ /**
+ * Clear any offsets before kafkaOffset. These offsets are no longer available in kafka.
+ * @param kafkaOffset
+ * @return Set of offsets removed.
+ */
+ Set<Long> clearOffsetsBefore(Long kafkaOffset);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fd51eacb/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index f8d2c41..f42b8f2 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -47,6 +47,9 @@ public class PartitionManager {
private final CountMetric _fetchAPIMessageCount;
// Count of messages which could not be emitted or retried because they were deleted from kafka
private final CountMetric _lostMessageCount;
+ // Count of messages which were not retried because failedMsgRetryManager didn't consider offset eligible for
+ // retry
+ private final CountMetric _messageIneligibleForRetryCount;
Long _emittedToOffset;
// _pending key = Kafka offset, value = time at which the message was first submitted to the topology
private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>();
@@ -73,9 +76,14 @@ public class PartitionManager {
_stormConf = stormConf;
numberAcked = numberFailed = 0;
- _failedMsgRetryManager = new ExponentialBackoffMsgRetryManager(_spoutConfig.retryInitialDelayMs,
- _spoutConfig.retryDelayMultiplier,
- _spoutConfig.retryDelayMaxMs);
+ try {
+ _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
+ _failedMsgRetryManager.prepare(spoutConfig);
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+ throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>",
+ FailedMsgRetryManager.class,
+ spoutConfig.failedMsgRetryManagerClass), e);
+ }
String jsonTopologyId = null;
Long jsonOffset = null;
@@ -121,6 +129,7 @@ public class PartitionManager {
_fetchAPICallCount = new CountMetric();
_fetchAPIMessageCount = new CountMetric();
_lostMessageCount = new CountMetric();
+ _messageIneligibleForRetryCount = new CountMetric();
}
public Map getMetricsDataMap() {
@@ -130,6 +139,7 @@ public class PartitionManager {
ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset());
ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset());
ret.put(_partition + "/lostMessageCount", _lostMessageCount.getValueAndReset());
+ ret.put(_partition + "/messageIneligibleForRetryCount", _messageIneligibleForRetryCount.getValueAndReset());
return ret;
}
@@ -198,7 +208,7 @@ public class PartitionManager {
// all the failed offsets, that are earlier than actual EarliestTime
// offset, since they are anyway not there.
// These calls to broker API will be then saved.
- Set<Long> omitted = this._failedMsgRetryManager.clearInvalidMessages(offset);
+ Set<Long> omitted = this._failedMsgRetryManager.clearOffsetsBefore(offset);
// Omitted messages have not been acked and may be lost
if (null != omitted) {
@@ -229,14 +239,14 @@ public class PartitionManager {
// Skip any old offsets.
continue;
}
- if (processingNewTuples || this._failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
+ if (processingNewTuples || this._failedMsgRetryManager.shouldReEmitMsg(cur_offset)) {
numMessages += 1;
if (!_pending.containsKey(cur_offset)) {
_pending.put(cur_offset, System.currentTimeMillis());
}
_waitingToEmit.add(msg);
_emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
- if (_failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
+ if (_failedMsgRetryManager.shouldReEmitMsg(cur_offset)) {
this._failedMsgRetryManager.retryStarted(cur_offset);
}
}
@@ -273,7 +283,15 @@ public class PartitionManager {
throw new RuntimeException("Too many tuple failures");
}
- this._failedMsgRetryManager.failed(offset);
+ // Offset may not be considered for retry by failedMsgRetryManager
+ if (this._failedMsgRetryManager.retryFurther(offset)) {
+ this._failedMsgRetryManager.failed(offset);
+ } else {
+ // state for the offset should be cleaned up
+ _messageIneligibleForRetryCount.incr();
+ _pending.remove(offset);
+ this._failedMsgRetryManager.acked(offset);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fd51eacb/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
index 1ac41c8..415ce0a 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
@@ -37,6 +37,8 @@ public class SpoutConfig extends KafkaConfig implements Serializable {
public long retryInitialDelayMs = 0;
public double retryDelayMultiplier = 1.0;
public long retryDelayMaxMs = 60 * 1000;
+ public int retryLimit = Integer.MAX_VALUE;
+ public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
super(hosts, topic);
http://git-wip-us.apache.org/repos/asf/storm/blob/fd51eacb/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
index 8fa6564..f2815e2 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
@@ -17,14 +17,14 @@
*/
package org.apache.storm.kafka;
+import org.junit.Test;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import org.junit.Test;
-
public class ExponentialBackoffMsgRetryManagerTest {
private static final Long TEST_OFFSET = 101L;
@@ -34,52 +34,54 @@ public class ExponentialBackoffMsgRetryManagerTest {
@Test
public void testImmediateRetry() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+
+
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry immediately", manager.shouldReEmitMsg(TEST_OFFSET));
manager.retryStarted(TEST_OFFSET);
manager.failed(TEST_OFFSET);
next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry immediately", manager.shouldReEmitMsg(TEST_OFFSET));
}
@Test
public void testSingleDelay() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(100, 1d, 1000);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(100, 1d, 1000, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
Thread.sleep(5);
Long next = manager.nextFailedMessageToRetry();
assertNull("expect no message ready for retry yet", next);
- assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready for retry yet", manager.shouldReEmitMsg(TEST_OFFSET));
Thread.sleep(100);
next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
}
@Test
public void testExponentialBackoff() throws Exception {
final long initial = 10;
final double mult = 2d;
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, initial * 10);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, initial * 10, Integer.MAX_VALUE);
long expectedWaitTime = initial;
for (long i = 0L; i < 3L; ++i) {
manager.failed(TEST_OFFSET);
Thread.sleep((expectedWaitTime + 1L) / 2L);
- assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready for retry yet", manager.shouldReEmitMsg(TEST_OFFSET));
Thread.sleep((expectedWaitTime + 1L) / 2L);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
manager.retryStarted(TEST_OFFSET);
expectedWaitTime *= mult;
@@ -91,7 +93,7 @@ public class ExponentialBackoffMsgRetryManagerTest {
final long initial = 10;
final double mult = 2d;
final long max = 20;
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, max);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, max, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
Thread.sleep(initial);
@@ -104,8 +106,8 @@ public class ExponentialBackoffMsgRetryManagerTest {
// so TEST_OFFSET2 should come first
Thread.sleep(initial * 2);
- assertTrue("message "+TEST_OFFSET+"should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
- assertTrue("message "+TEST_OFFSET2+"should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2));
+ assertTrue("message "+TEST_OFFSET+"should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
+ assertTrue("message "+TEST_OFFSET2+"should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET2));
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect first message to retry is "+TEST_OFFSET2, TEST_OFFSET2, next);
@@ -129,27 +131,27 @@ public class ExponentialBackoffMsgRetryManagerTest {
@Test
public void testQueriesAfterRetriedAlready() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry immediately", manager.shouldReEmitMsg(TEST_OFFSET));
manager.retryStarted(TEST_OFFSET);
next = manager.nextFailedMessageToRetry();
assertNull("expect no message ready after retried", next);
- assertFalse("message should not be ready after retried", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready after retried", manager.shouldReEmitMsg(TEST_OFFSET));
}
@Test(expected = IllegalStateException.class)
public void testRetryWithoutFail() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.retryStarted(TEST_OFFSET);
}
@Test(expected = IllegalStateException.class)
public void testFailRetryRetry() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
try {
manager.retryStarted(TEST_OFFSET);
@@ -157,7 +159,7 @@ public class ExponentialBackoffMsgRetryManagerTest {
fail("IllegalStateException unexpected here: " + ise);
}
- assertFalse("message should not be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
manager.retryStarted(TEST_OFFSET);
}
@@ -166,19 +168,19 @@ public class ExponentialBackoffMsgRetryManagerTest {
final long initial = 100;
final double mult = 2d;
final long max = 2000;
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, max);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, max, Integer.MAX_VALUE);
long expectedWaitTime = initial;
for (long i = 0L; i < 4L; ++i) {
manager.failed(TEST_OFFSET);
Thread.sleep((expectedWaitTime + 1L) / 2L);
- assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready for retry yet", manager.shouldReEmitMsg(TEST_OFFSET));
Thread.sleep((expectedWaitTime + 1L) / 2L);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
manager.retryStarted(TEST_OFFSET);
expectedWaitTime = Math.min((long) (expectedWaitTime * mult), max);
@@ -187,42 +189,42 @@ public class ExponentialBackoffMsgRetryManagerTest {
@Test
public void testFailThenAck() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
manager.acked(TEST_OFFSET);
Long next = manager.nextFailedMessageToRetry();
assertNull("expect no message ready after acked", next);
- assertFalse("message should not be ready after acked", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready after acked", manager.shouldReEmitMsg(TEST_OFFSET));
}
@Test
public void testAckThenFail() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.acked(TEST_OFFSET);
- assertFalse("message should not be ready after acked", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready after acked", manager.shouldReEmitMsg(TEST_OFFSET));
manager.failed(TEST_OFFSET);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
}
@Test
public void testClearInvalidMessages() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
manager.failed(TEST_OFFSET2);
manager.failed(TEST_OFFSET3);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2));
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET3));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET2));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET3));
- manager.clearInvalidMessages(TEST_NEW_OFFSET);
+ manager.clearOffsetsBefore(TEST_NEW_OFFSET);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET3, next);
@@ -232,4 +234,33 @@ public class ExponentialBackoffMsgRetryManagerTest {
assertNull("expect no message ready after acked", next);
}
+ @Test
+ public void testMaxRetry() throws Exception {
+ final long initial = 100;
+ final double mult = 2d;
+ final long max = 2000;
+ final int maxRetries = 2;
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, max, maxRetries);
+ assertTrue(manager.retryFurther(TEST_OFFSET));
+ manager.failed(TEST_OFFSET);
+
+ assertTrue(manager.retryFurther(TEST_OFFSET));
+ manager.failed(TEST_OFFSET);
+
+ assertFalse(manager.retryFurther(TEST_OFFSET));
+ }
+
+ private ExponentialBackoffMsgRetryManager buildExponentialBackoffMsgRetryManager(long retryInitialDelayMs,
+ double retryDelayMultiplier,
+ long retryDelayMaxMs,
+ int retryLimit) {
+ SpoutConfig spoutConfig = new SpoutConfig(null, null, null, null);
+ spoutConfig.retryInitialDelayMs = retryInitialDelayMs;
+ spoutConfig.retryDelayMultiplier = retryDelayMultiplier;
+ spoutConfig.retryDelayMaxMs = retryDelayMaxMs;
+ spoutConfig.retryLimit = retryLimit;
+ ExponentialBackoffMsgRetryManager exponentialBackoffMsgRetryManager = new ExponentialBackoffMsgRetryManager();
+ exponentialBackoffMsgRetryManager.prepare(spoutConfig);
+ return exponentialBackoffMsgRetryManager;
+ }
}