You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/07/20 17:25:15 UTC
storm git commit: STORM-643 fixing based on master branch
Repository: storm
Updated Branches:
refs/heads/master 54f6b32f6 -> f091743d9
STORM-643 fixing based on master branch
also add UT
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f091743d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f091743d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f091743d
Branch: refs/heads/master
Commit: f091743d9ebc935c9bbbc210095884c3d819632b
Parents: 54f6b32
Author: Xin Wang <be...@163.com>
Authored: Sat Jul 18 11:39:51 2015 +0800
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Jul 21 00:17:28 2015 +0900
----------------------------------------------------------------------
.../ExponentialBackoffMsgRetryManager.java | 21 ++++++++++++++--
.../jvm/storm/kafka/FailedMsgRetryManager.java | 3 +++
.../src/jvm/storm/kafka/PartitionManager.java | 12 +++++++++
.../ExponentialBackoffMsgRetryManagerTest.java | 26 +++++++++++++++++++-
4 files changed, 59 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f091743d/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
index 8c0bbe1..0d74fb8 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -18,10 +18,12 @@
package storm.kafka;
import java.util.Comparator;
-import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager {
@@ -30,7 +32,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
private final long retryDelayMaxMs;
private Queue<MessageRetryRecord> waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
- private Map<Long,MessageRetryRecord> records = new HashMap<Long,MessageRetryRecord>();
+ private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>();
public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double retryDelayMultiplier, long retryDelayMaxMs) {
this.retryInitialDelayMs = retryInitialDelayMs;
@@ -91,6 +93,21 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
System.currentTimeMillis() >= record.retryTimeUTC;
}
+ @Override
+ public Set<Long> clearInvalidMessages(Long kafkaOffset) {
+ Set<Long> invalidOffsets = new HashSet<Long>();
+ for(Long offset : records.keySet()){
+ if(offset < kafkaOffset){
+ MessageRetryRecord record = this.records.remove(offset);
+ if (record != null) {
+ this.waiting.remove(record);
+ invalidOffsets.add(offset);
+ }
+ }
+ }
+ return invalidOffsets;
+ }
+
/**
* A MessageRetryRecord holds the data of how many times a message has
* failed and been retried, and when the last failure occurred. It can
http://git-wip-us.apache.org/repos/asf/storm/blob/f091743d/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
index 3f0e117..30c9a24 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
@@ -17,10 +17,13 @@
*/
package storm.kafka;
+import java.util.Set;
+
public interface FailedMsgRetryManager {
public void failed(Long offset);
public void acked(Long offset);
public void retryStarted(Long offset);
public Long nextFailedMessageToRetry();
public boolean shouldRetryMsg(Long offset);
+ public Set<Long> clearInvalidMessages(Long kafkaOffset);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/f091743d/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index a7ed879..ce18677 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -171,6 +171,18 @@ public class PartitionManager {
_emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
LOG.warn("Using new offset: {}", _emittedToOffset);
// fetch failed, so don't update the metrics
+
+ //fix bug [STORM-643] : remove outdated failed offsets
+ if (!processingNewTuples) {
+ // For the case of EarliestTime it would be better to discard
+ // all the failed offsets, that are earlier than actual EarliestTime
+ // offset, since they are anyway not there.
+ // These calls to broker API will be then saved.
+ Set<Long> omitted = this._failedMsgRetryManager.clearInvalidMessages(_emittedToOffset);
+
+ LOG.warn("Removing the failed offsets that are out of range: {}", omitted);
+ }
+
return;
}
long end = System.nanoTime();
http://git-wip-us.apache.org/repos/asf/storm/blob/f091743d/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
index 3dd8b38..11ad5b7 100644
--- a/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
@@ -29,6 +29,8 @@ public class ExponentialBackoffMsgRetryManagerTest {
private static final Long TEST_OFFSET = 101L;
private static final Long TEST_OFFSET2 = 102L;
+ private static final Long TEST_OFFSET3 = 105L;
+ private static final Long TEST_NEW_OFFSET = 103L;
@Test
public void testImmediateRetry() throws Exception {
@@ -208,4 +210,26 @@ public class ExponentialBackoffMsgRetryManagerTest {
assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
}
-}
\ No newline at end of file
+
+ @Test
+ public void testClearInvalidMessages() throws Exception {
+ ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ manager.failed(TEST_OFFSET);
+ manager.failed(TEST_OFFSET2);
+ manager.failed(TEST_OFFSET3);
+
+ assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2));
+ assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET3));
+
+ manager.clearInvalidMessages(TEST_NEW_OFFSET);
+
+ Long next = manager.nextFailedMessageToRetry();
+ assertEquals("expect test offset next available for retry", TEST_OFFSET3, next);
+
+ manager.acked(TEST_OFFSET3);
+ next = manager.nextFailedMessageToRetry();
+ assertNull("expect no message ready after acked", next);
+ }
+
+}