You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/06/10 15:04:32 UTC

[04/10] storm git commit: STORM-1705: Cap number of retries for a failed message

STORM-1705: Cap number of retries for a failed message

Signed-off-by: P. Taylor Goetz <pt...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/567683f3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/567683f3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/567683f3

Branch: refs/heads/1.0.x-branch
Commit: 567683f3ac44af23afc6ab0b7a9877ec82957618
Parents: 0bfa2bc
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Tue Apr 12 15:28:06 2016 +0530
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Jun 10 10:35:43 2016 -0400

----------------------------------------------------------------------
 .../ExponentialBackoffMsgRetryManager.java      | 38 +++++---
 .../storm/kafka/FailedMsgRetryManager.java      | 59 ++++++++++--
 .../apache/storm/kafka/PartitionManager.java    | 32 +++++--
 .../jvm/org/apache/storm/kafka/SpoutConfig.java |  2 +
 .../ExponentialBackoffMsgRetryManagerTest.java  | 99 +++++++++++++-------
 5 files changed, 171 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/567683f3/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
index f86d624..7b5f5dd 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -27,17 +27,25 @@ import java.util.concurrent.ConcurrentHashMap;
 
 public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager {
 
-    private final long retryInitialDelayMs;
-    private final double retryDelayMultiplier;
-    private final long retryDelayMaxMs;
+    private long retryInitialDelayMs;
+    private double retryDelayMultiplier;
+    private long retryDelayMaxMs;
+    private int retryLimit;
 
-    private Queue<MessageRetryRecord> waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
-    private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>();
+    private Queue<MessageRetryRecord> waiting;
+    private Map<Long,MessageRetryRecord> records;
 
-    public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double retryDelayMultiplier, long retryDelayMaxMs) {
-        this.retryInitialDelayMs = retryInitialDelayMs;
-        this.retryDelayMultiplier = retryDelayMultiplier;
-        this.retryDelayMaxMs = retryDelayMaxMs;
+    public ExponentialBackoffMsgRetryManager() {
+
+    }
+
+    public void prepare(SpoutConfig spoutConfig) {
+        this.retryInitialDelayMs = spoutConfig.retryInitialDelayMs;
+        this.retryDelayMultiplier = spoutConfig.retryDelayMultiplier;
+        this.retryDelayMaxMs = spoutConfig.retryDelayMaxMs;
+        this.retryLimit = spoutConfig.retryLimit;
+        this.waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
+        this.records = new ConcurrentHashMap<Long,MessageRetryRecord>();
     }
 
     @Override
@@ -86,7 +94,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
     }
 
     @Override
-    public boolean shouldRetryMsg(Long offset) {
+    public boolean shouldReEmitMsg(Long offset) {
         MessageRetryRecord record = this.records.get(offset);
         return record != null &&
                 this.waiting.contains(record) &&
@@ -94,7 +102,15 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
     }
 
     @Override
-    public Set<Long> clearInvalidMessages(Long kafkaOffset) {
+    public boolean retryFurther(Long offset) {
+        MessageRetryRecord record = this.records.get(offset);
+        return ! (record != null &&
+               this.waiting.contains(record) &&
+               this.retryLimit <= record.retryNum);
+    }
+
+    @Override
+    public Set<Long> clearOffsetsBefore(Long kafkaOffset) {
         Set<Long> invalidOffsets = new HashSet<Long>(); 
         for(Long offset : records.keySet()){
             if(offset < kafkaOffset){

http://git-wip-us.apache.org/repos/asf/storm/blob/567683f3/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
index e9a7092..9a3b19f 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
@@ -17,13 +17,58 @@
  */
 package org.apache.storm.kafka;
 
+import java.io.Serializable;
 import java.util.Set;
 
-public interface FailedMsgRetryManager {
-    public void failed(Long offset);
-    public void acked(Long offset);
-    public void retryStarted(Long offset);
-    public Long nextFailedMessageToRetry();
-    public boolean shouldRetryMsg(Long offset);
-    public Set<Long> clearInvalidMessages(Long kafkaOffset);
+public interface FailedMsgRetryManager extends Serializable {
+
+    /**
+     * Initialization
+     */
+    void prepare(SpoutConfig spoutConfig);
+
+    /**
+     * Message corresponding to the offset failed in kafka spout.
+     * @param offset
+     */
+    void failed(Long offset);
+
+    /**
+     * Message corresponding to the offset, was acked to kafka spout.
+     * @param offset
+     */
+    void acked(Long offset);
+
+    /**
+     * Message corresponding to the offset, has been re-emitted and under transit.
+     * @param offset
+     */
+    void retryStarted(Long offset);
+
+    /**
+     * The offset of message, which is to be re-emitted. Spout will fetch messages starting from this offset
+     * and resend them, except completed messages.
+     * @return
+     */
+    Long nextFailedMessageToRetry();
+
+    /**
+     * @param offset
+     * @return True if the message corresponding to the offset should be emitted NOW. False otherwise.
+     */
+    boolean shouldReEmitMsg(Long offset);
+
+    /**
+     * Spout will clean up the state for this offset if false is returned.
+     * @param offset
+     * @return True if the message will be retried again. False otherwise.
+     */
+    boolean retryFurther(Long offset);
+
+    /**
+     * Clear any offsets before kafkaOffset. These offsets are no longer available in kafka.
+     * @param kafkaOffset
+     * @return Set of offsets removed.
+     */
+    Set<Long> clearOffsetsBefore(Long kafkaOffset);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/567683f3/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index f9d8cf6..16bdccf 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -47,6 +47,9 @@ public class PartitionManager {
     private final CountMetric _fetchAPIMessageCount;
     // Count of messages which could not be emitted or retried because they were deleted from kafka
     private final CountMetric _lostMessageCount;
+    // Count of messages which were not retried because failedMsgRetryManager didn't consider offset eligible for
+    // retry
+    private final CountMetric _messageIneligibleForRetryCount;
     Long _emittedToOffset;
     // _pending key = Kafka offset, value = time at which the message was first submitted to the topology
     private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>();
@@ -73,9 +76,14 @@ public class PartitionManager {
         _stormConf = stormConf;
         numberAcked = numberFailed = 0;
 
-        _failedMsgRetryManager = new ExponentialBackoffMsgRetryManager(_spoutConfig.retryInitialDelayMs,
-                                                                           _spoutConfig.retryDelayMultiplier,
-                                                                           _spoutConfig.retryDelayMaxMs);
+        try {
+            _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
+            _failedMsgRetryManager.prepare(spoutConfig);
+        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+            throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>",
+                                                             FailedMsgRetryManager.class,
+                                                             spoutConfig.failedMsgRetryManagerClass), e);
+        }
 
         String jsonTopologyId = null;
         Long jsonOffset = null;
@@ -121,6 +129,7 @@ public class PartitionManager {
         _fetchAPICallCount = new CountMetric();
         _fetchAPIMessageCount = new CountMetric();
         _lostMessageCount = new CountMetric();
+        _messageIneligibleForRetryCount = new CountMetric();
     }
 
     public Map getMetricsDataMap() {
@@ -130,6 +139,7 @@ public class PartitionManager {
         ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset());
         ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset());
         ret.put(_partition + "/lostMessageCount", _lostMessageCount.getValueAndReset());
+        ret.put(_partition + "/messageIneligibleForRetryCount", _messageIneligibleForRetryCount.getValueAndReset());
         return ret;
     }
 
@@ -198,7 +208,7 @@ public class PartitionManager {
                 // all the failed offsets, that are earlier than actual EarliestTime
                 // offset, since they are anyway not there.
                 // These calls to broker API will be then saved.
-                Set<Long> omitted = this._failedMsgRetryManager.clearInvalidMessages(offset);
+                Set<Long> omitted = this._failedMsgRetryManager.clearOffsetsBefore(offset);
 
                 // Omitted messages have not been acked and may be lost
                 if (null != omitted) {
@@ -230,14 +240,14 @@ public class PartitionManager {
                     // Skip any old offsets.
                     continue;
                 }
-                if (processingNewTuples || this._failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
+                if (processingNewTuples || this._failedMsgRetryManager.shouldReEmitMsg(cur_offset)) {
                     numMessages += 1;
                     if (!_pending.containsKey(cur_offset)) {
                         _pending.put(cur_offset, System.currentTimeMillis());
                     }
                     _waitingToEmit.add(msg);
                     _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
-                    if (_failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
+                    if (_failedMsgRetryManager.shouldReEmitMsg(cur_offset)) {
                         this._failedMsgRetryManager.retryStarted(cur_offset);
                     }
                 }
@@ -274,7 +284,15 @@ public class PartitionManager {
                 throw new RuntimeException("Too many tuple failures");
             }
 
-            this._failedMsgRetryManager.failed(offset);
+            // Offset may not be considered for retry by failedMsgRetryManager
+            if (this._failedMsgRetryManager.retryFurther(offset)) {
+                this._failedMsgRetryManager.failed(offset);
+            } else {
+                // state for the offset should be cleaned up
+                _messageIneligibleForRetryCount.incr();
+                _pending.remove(offset);
+                this._failedMsgRetryManager.acked(offset);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/567683f3/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
index 1ac41c8..415ce0a 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
@@ -37,6 +37,8 @@ public class SpoutConfig extends KafkaConfig implements Serializable {
     public long retryInitialDelayMs = 0;
     public double retryDelayMultiplier = 1.0;
     public long retryDelayMaxMs = 60 * 1000;
+    public int retryLimit = Integer.MAX_VALUE;
+    public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
 
     public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
         super(hosts, topic);

http://git-wip-us.apache.org/repos/asf/storm/blob/567683f3/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
index 8fa6564..f2815e2 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
@@ -17,14 +17,14 @@
  */
 package org.apache.storm.kafka;
 
+import org.junit.Test;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import org.junit.Test;
-
 public class ExponentialBackoffMsgRetryManagerTest {
 
     private static final Long TEST_OFFSET = 101L;
@@ -34,52 +34,54 @@ public class ExponentialBackoffMsgRetryManagerTest {
 
     @Test
     public void testImmediateRetry() throws Exception {
-        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        
+        
+        ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
         manager.failed(TEST_OFFSET);
         Long next = manager.nextFailedMessageToRetry();
         assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
-        assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+        assertTrue("message should be ready for retry immediately", manager.shouldReEmitMsg(TEST_OFFSET));
 
         manager.retryStarted(TEST_OFFSET);
 
         manager.failed(TEST_OFFSET);
         next = manager.nextFailedMessageToRetry();
         assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
-        assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+        assertTrue("message should be ready for retry immediately", manager.shouldReEmitMsg(TEST_OFFSET));
     }
 
     @Test
     public void testSingleDelay() throws Exception {
-        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(100, 1d, 1000);
+        ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(100, 1d, 1000, Integer.MAX_VALUE);
         manager.failed(TEST_OFFSET);
         Thread.sleep(5);
         Long next = manager.nextFailedMessageToRetry();
         assertNull("expect no message ready for retry yet", next);
-        assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+        assertFalse("message should not be ready for retry yet", manager.shouldReEmitMsg(TEST_OFFSET));
 
         Thread.sleep(100);
         next = manager.nextFailedMessageToRetry();
         assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
-        assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+        assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
     }
 
     @Test
     public void testExponentialBackoff() throws Exception {
         final long initial = 10;
         final double mult = 2d;
-        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, initial * 10);
+        ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, initial * 10, Integer.MAX_VALUE);
 
         long expectedWaitTime = initial;
         for (long i = 0L; i < 3L; ++i) {
             manager.failed(TEST_OFFSET);
 
             Thread.sleep((expectedWaitTime + 1L) / 2L);
-            assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+            assertFalse("message should not be ready for retry yet", manager.shouldReEmitMsg(TEST_OFFSET));
 
             Thread.sleep((expectedWaitTime + 1L) / 2L);
             Long next = manager.nextFailedMessageToRetry();
             assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
-            assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+            assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
 
             manager.retryStarted(TEST_OFFSET);
             expectedWaitTime *= mult;
@@ -91,7 +93,7 @@ public class ExponentialBackoffMsgRetryManagerTest {
         final long initial = 10;
         final double mult = 2d;
         final long max = 20;
-        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, max);
+        ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, max, Integer.MAX_VALUE);
 
         manager.failed(TEST_OFFSET);
         Thread.sleep(initial);
@@ -104,8 +106,8 @@ public class ExponentialBackoffMsgRetryManagerTest {
         // so TEST_OFFSET2 should come first
 
         Thread.sleep(initial * 2);
-        assertTrue("message "+TEST_OFFSET+"should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
-        assertTrue("message "+TEST_OFFSET2+"should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2));
+        assertTrue("message "+TEST_OFFSET+"should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
+        assertTrue("message "+TEST_OFFSET2+"should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET2));
 
         Long next = manager.nextFailedMessageToRetry();
         assertEquals("expect first message to retry is "+TEST_OFFSET2, TEST_OFFSET2, next);
@@ -129,27 +131,27 @@ public class ExponentialBackoffMsgRetryManagerTest {
 
     @Test
     public void testQueriesAfterRetriedAlready() throws Exception {
-        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
         manager.failed(TEST_OFFSET);
         Long next = manager.nextFailedMessageToRetry();
         assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
-        assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET));
+        assertTrue("message should be ready for retry immediately", manager.shouldReEmitMsg(TEST_OFFSET));
 
         manager.retryStarted(TEST_OFFSET);
         next = manager.nextFailedMessageToRetry();
         assertNull("expect no message ready after retried", next);
-        assertFalse("message should not be ready after retried", manager.shouldRetryMsg(TEST_OFFSET));
+        assertFalse("message should not be ready after retried", manager.shouldReEmitMsg(TEST_OFFSET));
     }
 
     @Test(expected = IllegalStateException.class)
     public void testRetryWithoutFail() throws Exception {
-        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
         manager.retryStarted(TEST_OFFSET);
     }
 
     @Test(expected = IllegalStateException.class)
     public void testFailRetryRetry() throws Exception {
-        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
         manager.failed(TEST_OFFSET);
         try {
             manager.retryStarted(TEST_OFFSET);
@@ -157,7 +159,7 @@ public class ExponentialBackoffMsgRetryManagerTest {
             fail("IllegalStateException unexpected here: " + ise);
         }
 
-        assertFalse("message should not be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+        assertFalse("message should not be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
         manager.retryStarted(TEST_OFFSET);
     }
 
@@ -166,19 +168,19 @@ public class ExponentialBackoffMsgRetryManagerTest {
         final long initial = 100;
         final double mult = 2d;
         final long max = 2000;
-        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, max);
+        ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, max, Integer.MAX_VALUE);
 
         long expectedWaitTime = initial;
         for (long i = 0L; i < 4L; ++i) {
             manager.failed(TEST_OFFSET);
 
             Thread.sleep((expectedWaitTime + 1L) / 2L);
-            assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET));
+            assertFalse("message should not be ready for retry yet", manager.shouldReEmitMsg(TEST_OFFSET));
 
             Thread.sleep((expectedWaitTime + 1L) / 2L);
             Long next = manager.nextFailedMessageToRetry();
             assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
-            assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+            assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
 
             manager.retryStarted(TEST_OFFSET);
             expectedWaitTime = Math.min((long) (expectedWaitTime * mult), max);
@@ -187,42 +189,42 @@ public class ExponentialBackoffMsgRetryManagerTest {
 
     @Test
     public void testFailThenAck() throws Exception {
-        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
         manager.failed(TEST_OFFSET);
-        assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+        assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
 
         manager.acked(TEST_OFFSET);
 
         Long next = manager.nextFailedMessageToRetry();
         assertNull("expect no message ready after acked", next);
-        assertFalse("message should not be ready after acked", manager.shouldRetryMsg(TEST_OFFSET));
+        assertFalse("message should not be ready after acked", manager.shouldReEmitMsg(TEST_OFFSET));
     }
 
     @Test
     public void testAckThenFail() throws Exception {
-        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
         manager.acked(TEST_OFFSET);
-        assertFalse("message should not be ready after acked", manager.shouldRetryMsg(TEST_OFFSET));
+        assertFalse("message should not be ready after acked", manager.shouldReEmitMsg(TEST_OFFSET));
 
         manager.failed(TEST_OFFSET);
 
         Long next = manager.nextFailedMessageToRetry();
         assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
-        assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+        assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
     }
     
     @Test
     public void testClearInvalidMessages() throws Exception {
-        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE);
         manager.failed(TEST_OFFSET);
         manager.failed(TEST_OFFSET2);
         manager.failed(TEST_OFFSET3);
         
-        assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
-        assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2));
-        assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET3));
+        assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET));
+        assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET2));
+        assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET3));
 
-        manager.clearInvalidMessages(TEST_NEW_OFFSET);
+        manager.clearOffsetsBefore(TEST_NEW_OFFSET);
 
         Long next = manager.nextFailedMessageToRetry();
         assertEquals("expect test offset next available for retry", TEST_OFFSET3, next);
@@ -232,4 +234,33 @@ public class ExponentialBackoffMsgRetryManagerTest {
         assertNull("expect no message ready after acked", next);
     }
 
+    @Test
+    public void testMaxRetry() throws Exception {
+        final long initial = 100;
+        final double mult = 2d;
+        final long max = 2000;
+        final int maxRetries = 2;
+        ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, max, maxRetries);
+        assertTrue(manager.retryFurther(TEST_OFFSET));
+        manager.failed(TEST_OFFSET);
+
+        assertTrue(manager.retryFurther(TEST_OFFSET));
+        manager.failed(TEST_OFFSET);
+
+        assertFalse(manager.retryFurther(TEST_OFFSET));
+    }
+    
+    private ExponentialBackoffMsgRetryManager buildExponentialBackoffMsgRetryManager(long retryInitialDelayMs, 
+                                                                                     double retryDelayMultiplier,
+                                                                                     long retryDelayMaxMs,
+                                                                                     int retryLimit) {
+        SpoutConfig spoutConfig = new SpoutConfig(null, null, null, null);
+        spoutConfig.retryInitialDelayMs = retryInitialDelayMs;
+        spoutConfig.retryDelayMultiplier = retryDelayMultiplier;
+        spoutConfig.retryDelayMaxMs = retryDelayMaxMs;
+        spoutConfig.retryLimit = retryLimit; 
+        ExponentialBackoffMsgRetryManager exponentialBackoffMsgRetryManager = new ExponentialBackoffMsgRetryManager();
+        exponentialBackoffMsgRetryManager.prepare(spoutConfig);
+        return exponentialBackoffMsgRetryManager;
+    }
 }