You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/06/10 15:04:34 UTC
[06/10] storm git commit: STORM-1705: Cap number of retries for a
failed message
STORM-1705: Cap number of retries for a failed message
Signed-off-by: P. Taylor Goetz <pt...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fd51eacb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fd51eacb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fd51eacb
Branch: refs/heads/master
Commit: fd51eacb58c52d7ed35ecbf52b7e6a9509fe1a7f
Parents: 0b54767
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Tue Apr 12 15:28:06 2016 +0530
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Jun 10 10:36:15 2016 -0400
----------------------------------------------------------------------
.../ExponentialBackoffMsgRetryManager.java | 38 +++++---
.../storm/kafka/FailedMsgRetryManager.java | 59 ++++++++++--
.../apache/storm/kafka/PartitionManager.java | 32 +++++--
.../jvm/org/apache/storm/kafka/SpoutConfig.java | 2 +
.../ExponentialBackoffMsgRetryManagerTest.java | 99 +++++++++++++-------
5 files changed, 171 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fd51eacb/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
index f86d624..7b5f5dd 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -27,17 +27,25 @@ import java.util.concurrent.ConcurrentHashMap;
public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager {
- private final long retryInitialDelayMs;
- private final double retryDelayMultiplier;
- private final long retryDelayMaxMs;
+ private long retryInitialDelayMs;
+ private double retryDelayMultiplier;
+ private long retryDelayMaxMs;
+ private int retryLimit;
- private Queue<MessageRetryRecord> waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
- private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>();
+ private Queue<MessageRetryRecord> waiting;
+ private Map<Long,MessageRetryRecord> records;
- public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double retryDelayMultiplier, long retryDelayMaxMs) {
- this.retryInitialDelayMs = retryInitialDelayMs;
- this.retryDelayMultiplier = retryDelayMultiplier;
- this.retryDelayMaxMs = retryDelayMaxMs;
+ public ExponentialBackoffMsgRetryManager() {
+
+ }
+
+ public void prepare(SpoutConfig spoutConfig) {
+ this.retryInitialDelayMs = spoutConfig.retryInitialDelayMs;
+ this.retryDelayMultiplier = spoutConfig.retryDelayMultiplier;
+ this.retryDelayMaxMs = spoutConfig.retryDelayMaxMs;
+ this.retryLimit = spoutConfig.retryLimit;
+ this.waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
+ this.records = new ConcurrentHashMap<Long,MessageRetryRecord>();
}
@Override
@@ -86,7 +94,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
}
@Override
- public boolean shouldRetryMsg(Long offset) {
+ public boolean shouldReEmitMsg(Long offset) {
MessageRetryRecord record = this.records.get(offset);
return record != null &&
this.waiting.contains(record) &&
@@ -94,7 +102,15 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
}
@Override
- public Set<Long> clearInvalidMessages(Long kafkaOffset) {
+ public boolean retryFurther(Long offset) {
+ MessageRetryRecord record = this.records.get(offset);
+ return ! (record != null &&
+ this.waiting.contains(record) &&
+ this.retryLimit <= record.retryNum);
+ }
+
+ @Override
+ public Set<Long> clearOffsetsBefore(Long kafkaOffset) {
Set<Long> invalidOffsets = new HashSet<Long>();
for(Long offset : records.keySet()){
if(offset < kafkaOffset){
http://git-wip-us.apache.org/repos/asf/storm/blob/fd51eacb/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
index e9a7092..9a3b19f 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
@@ -17,13 +17,58 @@
*/
package org.apache.storm.kafka;
+import java.io.Serializable;
import java.util.Set;
-public interface FailedMsgRetryManager {
- public void failed(Long offset);
- public void acked(Long offset);
- public void retryStarted(Long offset);
- public Long nextFailedMessageToRetry();
- public boolean shouldRetryMsg(Long offset);
- public Set<Long> clearInvalidMessages(Long kafkaOffset);
+public interface FailedMsgRetryManager extends Serializable {
+
+ /**
+ * Initialization
+ */
+ void prepare(SpoutConfig spoutConfig);
+
+ /**
+ * Message corresponding to the offset failed in kafka spout.
+ * @param offset
+ */
+ void failed(Long offset);
+
+ /**
+ * Message corresponding to the offset, was acked to kafka spout.
+ * @param offset
+ */
+ void acked(Long offset);
+
+ /**
+ * Message corresponding to the offset, has been re-emitted and under transit.
+ * @param offset
+ */
+ void retryStarted(Long offset);
+
+ /**
+ * The offset of message, which is to be re-emitted. Spout will fetch messages starting from this offset
+ * and resend them, except completed messages.
+ * @return
+ */
+ Long nextFailedMessageToRetry();
+
+ /**
+ * @param offset
+ * @return True if the message corresponding to the offset should be emitted NOW. False otherwise.
+ */
+ boolean shouldReEmitMsg(Long offset);
+
+ /**
+ * Spout will clean up the state for this offset if false is returned.
+ * @param offset
+ * @return True if the message will be retried again. False otherwise.
+ */
+ boolean retryFurther(Long offset);
+
+ /**
+ * Clear any offsets before kafkaOffset. These offsets are no longer available in kafka.
+ * @param kafkaOffset
+ * @return Set of offsets removed.
+ */
+ Set<Long> clearOffsetsBefore(Long kafkaOffset);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fd51eacb/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index f8d2c41..f42b8f2 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -47,6 +47,9 @@ public class PartitionManager {
private final CountMetric _fetchAPIMessageCount;
// Count of messages which could not be emitted or retried because they were deleted from kafka
private final CountMetric _lostMessageCount;
+ // Count of messages which were not retried because failedMsgRetryManager didn't consider offset eligible for
+ // retry
+ private final CountMetric _messageIneligibleForRetryCount;
Long _emittedToOffset;
// _pending key = Kafka offset, value = time at which the message was first submitted to the topology
private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>();
@@ -73,9 +76,14 @@ public class PartitionManager {
_stormConf = stormConf;
numberAcked = numberFailed = 0;
- _failedMsgRetryManager = new ExponentialBackoffMsgRetryManager(_spoutConfig.retryInitialDelayMs,
- _spoutConfig.retryDelayMultiplier,
- _spoutConfig.retryDelayMaxMs);
+ try {
+ _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
+ _failedMsgRetryManager.prepare(spoutConfig);
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+ throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>",
+ FailedMsgRetryManager.class,
+ spoutConfig.failedMsgRetryManagerClass), e);
+ }
String jsonTopologyId = null;
Long jsonOffset = null;
@@ -121,6 +129,7 @@ public class PartitionManager {
_fetchAPICallCount = new CountMetric();
_fetchAPIMessageCount = new CountMetric();
_lostMessageCount = new CountMetric();
+ _messageIneligibleForRetryCount = new CountMetric();
}
public Map getMetricsDataMap() {
@@ -130,6 +139,7 @@ public class PartitionManager {
ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset());
ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset());
ret.put(_partition + "/lostMessageCount", _lostMessageCount.getValueAndReset());
+ ret.put(_partition + "/messageIneligibleForRetryCount", _messageIneligibleForRetryCount.getValueAndReset());
return ret;
}
@@ -198,7 +208,7 @@ public class PartitionManager {
// all the failed offsets, that are earlier than actual EarliestTime
// offset, since they are anyway not there.
// These calls to broker API will be then saved.
- Set<Long> omitted = this._failedMsgRetryManager.clearInvalidMessages(offset);
+ Set<Long> omitted = this._failedMsgRetryManager.clearOffsetsBefore(offset);
// Omitted messages have not been acked and may be lost
if (null != omitted) {
@@ -229,14 +239,14 @@ public class PartitionManager {
// Skip any old offsets.
continue;
}
- if (processingNewTuples || this._failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
+ if (processingNewTuples || this._failedMsgRetryManager.shouldReEmitMsg(cur_offset)) {
numMessages += 1;
if (!_pending.containsKey(cur_offset)) {
_pending.put(cur_offset, System.currentTimeMillis());
}
_waitingToEmit.add(msg);
_emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
- if (_failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
+ if (_failedMsgRetryManager.shouldReEmitMsg(cur_offset)) {
this._failedMsgRetryManager.retryStarted(cur_offset);
}
}
@@ -273,7 +283,15 @@ public class PartitionManager {
throw new RuntimeException("Too many tuple failures");
}
- this._failedMsgRetryManager.failed(offset);
+ // Offset may not be considered for retry by failedMsgRetryManager
+ if (this._failedMsgRetryManager.retryFurther(offset)) {
+ this._failedMsgRetryManager.failed(offset);
+ } else {
+ // state for the offset should be cleaned up
+ _messageIneligibleForRetryCount.incr();
+ _pending.remove(offset);
+ this._failedMsgRetryManager.acked(offset);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fd51eacb/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
index 1ac41c8..415ce0a 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
@@ -37,6 +37,8 @@ public class SpoutConfig extends KafkaConfig implements Serializable {
public long retryInitialDelayMs = 0;
public double retryDelayMultiplier = 1.0;
public long retryDelayMaxMs = 60 * 1000;
+ public int retryLimit = Integer.MAX_VALUE;
+ public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
super(hosts, topic);
http://git-wip-us.apache.org/repos/asf/storm/blob/fd51eacb/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
index 8fa6564..f2815e2 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
@@ -17,14 +17,14 @@
*/
package org.apache.storm.kafka;
+import org.junit.Test;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import org.junit.Test;
-
public class ExponentialBackoffMsgRetryManagerTest {
private static final Long TEST_OFFSET = 101L;
@@ -34,52 +34,54 @@ public class ExponentialBackoffMsgRetryManagerTest {
@Test
public void testImmediateRetry() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+
+
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry immediately", manager.shouldReEmitMsg(TEST_OFFSET));
manager.retryStarted(TEST_OFFSET);
manager.failed(TEST_OFFSET);
next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry immediately", manager.shouldReEmitMsg(TEST_OFFSET));
}
@Test
public void testSingleDelay() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(100, 1d, 1000);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(100, 1d, 1000, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
Thread.sleep(5);
Long next = manager.nextFailedMessageToRetry();
assertNull("expect no message ready for retry yet", next);
- assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready for retry yet", manager.shouldReEmitMsg(TEST_OFFSET));
Thread.sleep(100);
next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
}
@Test
public void testExponentialBackoff() throws Exception {
final long initial = 10;
final double mult = 2d;
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, initial * 10);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, initial * 10, Integer.MAX_VALUE);
long expectedWaitTime = initial;
for (long i = 0L; i < 3L; ++i) {
manager.failed(TEST_OFFSET);
Thread.sleep((expectedWaitTime + 1L) / 2L);
- assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready for retry yet", manager.shouldReEmitMsg(TEST_OFFSET));
Thread.sleep((expectedWaitTime + 1L) / 2L);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
manager.retryStarted(TEST_OFFSET);
expectedWaitTime *= mult;
@@ -91,7 +93,7 @@ public class ExponentialBackoffMsgRetryManagerTest {
final long initial = 10;
final double mult = 2d;
final long max = 20;
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, max);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, max, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
Thread.sleep(initial);
@@ -104,8 +106,8 @@ public class ExponentialBackoffMsgRetryManagerTest {
// so TEST_OFFSET2 should come first
Thread.sleep(initial * 2);
- assertTrue("message "+TEST_OFFSET+"should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
- assertTrue("message "+TEST_OFFSET2+"should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2));
+ assertTrue("message "+TEST_OFFSET+"should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
+ assertTrue("message "+TEST_OFFSET2+"should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET2));
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect first message to retry is "+TEST_OFFSET2, TEST_OFFSET2, next);
@@ -129,27 +131,27 @@ public class ExponentialBackoffMsgRetryManagerTest {
@Test
public void testQueriesAfterRetriedAlready() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry immediately", manager.shouldReEmitMsg(TEST_OFFSET));
manager.retryStarted(TEST_OFFSET);
next = manager.nextFailedMessageToRetry();
assertNull("expect no message ready after retried", next);
- assertFalse("message should not be ready after retried", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready after retried", manager.shouldReEmitMsg(TEST_OFFSET));
}
@Test(expected = IllegalStateException.class)
public void testRetryWithoutFail() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.retryStarted(TEST_OFFSET);
}
@Test(expected = IllegalStateException.class)
public void testFailRetryRetry() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
try {
manager.retryStarted(TEST_OFFSET);
@@ -157,7 +159,7 @@ public class ExponentialBackoffMsgRetryManagerTest {
fail("IllegalStateException unexpected here: " + ise);
}
- assertFalse("message should not be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
manager.retryStarted(TEST_OFFSET);
}
@@ -166,19 +168,19 @@ public class ExponentialBackoffMsgRetryManagerTest {
final long initial = 100;
final double mult = 2d;
final long max = 2000;
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, max);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, max, Integer.MAX_VALUE);
long expectedWaitTime = initial;
for (long i = 0L; i < 4L; ++i) {
manager.failed(TEST_OFFSET);
Thread.sleep((expectedWaitTime + 1L) / 2L);
- assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready for retry yet", manager.shouldReEmitMsg(TEST_OFFSET));
Thread.sleep((expectedWaitTime + 1L) / 2L);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
manager.retryStarted(TEST_OFFSET);
expectedWaitTime = Math.min((long) (expectedWaitTime * mult), max);
@@ -187,42 +189,42 @@ public class ExponentialBackoffMsgRetryManagerTest {
@Test
public void testFailThenAck() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
manager.acked(TEST_OFFSET);
Long next = manager.nextFailedMessageToRetry();
assertNull("expect no message ready after acked", next);
- assertFalse("message should not be ready after acked", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready after acked", manager.shouldReEmitMsg(TEST_OFFSET));
}
@Test
public void testAckThenFail() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.acked(TEST_OFFSET);
- assertFalse("message should not be ready after acked", manager.shouldRetryMsg(TEST_OFFSET));
+ assertFalse("message should not be ready after acked", manager.shouldReEmitMsg(TEST_OFFSET));
manager.failed(TEST_OFFSET);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
}
@Test
public void testClearInvalidMessages() throws Exception {
- ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
manager.failed(TEST_OFFSET);
manager.failed(TEST_OFFSET2);
manager.failed(TEST_OFFSET3);
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2));
- assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET3));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET2));
+ assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET3));
- manager.clearInvalidMessages(TEST_NEW_OFFSET);
+ manager.clearOffsetsBefore(TEST_NEW_OFFSET);
Long next = manager.nextFailedMessageToRetry();
assertEquals("expect test offset next available for retry", TEST_OFFSET3, next);
@@ -232,4 +234,33 @@ public class ExponentialBackoffMsgRetryManagerTest {
assertNull("expect no message ready after acked", next);
}
+ @Test
+ public void testMaxRetry() throws Exception {
+ final long initial = 100;
+ final double mult = 2d;
+ final long max = 2000;
+ final int maxRetries = 2;
+ ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, max, maxRetries);
+ assertTrue(manager.retryFurther(TEST_OFFSET));
+ manager.failed(TEST_OFFSET);
+
+ assertTrue(manager.retryFurther(TEST_OFFSET));
+ manager.failed(TEST_OFFSET);
+
+ assertFalse(manager.retryFurther(TEST_OFFSET));
+ }
+
+ private ExponentialBackoffMsgRetryManager buildExponentialBackoffMsgRetryManager(long retryInitialDelayMs,
+ double retryDelayMultiplier,
+ long retryDelayMaxMs,
+ int retryLimit) {
+ SpoutConfig spoutConfig = new SpoutConfig(null, null, null, null);
+ spoutConfig.retryInitialDelayMs = retryInitialDelayMs;
+ spoutConfig.retryDelayMultiplier = retryDelayMultiplier;
+ spoutConfig.retryDelayMaxMs = retryDelayMaxMs;
+ spoutConfig.retryLimit = retryLimit;
+ ExponentialBackoffMsgRetryManager exponentialBackoffMsgRetryManager = new ExponentialBackoffMsgRetryManager();
+ exponentialBackoffMsgRetryManager.prepare(spoutConfig);
+ return exponentialBackoffMsgRetryManager;
+ }
}