You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/07/20 17:25:15 UTC

storm git commit: STORM-643 fixing based on master branch

Repository: storm
Updated Branches:
  refs/heads/master 54f6b32f6 -> f091743d9


STORM-643 fixing based on master branch

also add UT


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f091743d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f091743d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f091743d

Branch: refs/heads/master
Commit: f091743d9ebc935c9bbbc210095884c3d819632b
Parents: 54f6b32
Author: Xin Wang <be...@163.com>
Authored: Sat Jul 18 11:39:51 2015 +0800
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Jul 21 00:17:28 2015 +0900

----------------------------------------------------------------------
 .../ExponentialBackoffMsgRetryManager.java      | 21 ++++++++++++++--
 .../jvm/storm/kafka/FailedMsgRetryManager.java  |  3 +++
 .../src/jvm/storm/kafka/PartitionManager.java   | 12 +++++++++
 .../ExponentialBackoffMsgRetryManagerTest.java  | 26 +++++++++++++++++++-
 4 files changed, 59 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f091743d/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
index 8c0bbe1..0d74fb8 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -18,10 +18,12 @@
 package storm.kafka;
 
 import java.util.Comparator;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager {
 
@@ -30,7 +32,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
     private final long retryDelayMaxMs;
 
     private Queue<MessageRetryRecord> waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
-    private Map<Long,MessageRetryRecord> records = new HashMap<Long,MessageRetryRecord>();
+    private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>();
 
     public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double retryDelayMultiplier, long retryDelayMaxMs) {
         this.retryInitialDelayMs = retryInitialDelayMs;
@@ -91,6 +93,21 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
                 System.currentTimeMillis() >= record.retryTimeUTC;
     }
 
+    @Override
+    public Set<Long> clearInvalidMessages(Long kafkaOffset) {
+        Set<Long> invalidOffsets = new HashSet<Long>(); 
+        for(Long offset : records.keySet()){
+            if(offset < kafkaOffset){
+                MessageRetryRecord record = this.records.remove(offset);
+                if (record != null) {
+                    this.waiting.remove(record);
+                    invalidOffsets.add(offset);
+                }
+            }
+        }
+        return invalidOffsets;
+    }
+
     /**
      * A MessageRetryRecord holds the data of how many times a message has
      * failed and been retried, and when the last failure occurred.  It can

http://git-wip-us.apache.org/repos/asf/storm/blob/f091743d/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
index 3f0e117..30c9a24 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
@@ -17,10 +17,13 @@
  */
 package storm.kafka;
 
+import java.util.Set;
+
 public interface FailedMsgRetryManager {
     public void failed(Long offset);
     public void acked(Long offset);
     public void retryStarted(Long offset);
     public Long nextFailedMessageToRetry();
     public boolean shouldRetryMsg(Long offset);
+    public Set<Long> clearInvalidMessages(Long kafkaOffset);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f091743d/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index a7ed879..ce18677 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -171,6 +171,18 @@ public class PartitionManager {
             _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
             LOG.warn("Using new offset: {}", _emittedToOffset);
             // fetch failed, so don't update the metrics
+            
+            //fix bug [STORM-643] : remove outdated failed offsets
+            if (!processingNewTuples) {
+                // For the case of EarliestTime it would be better to discard
+                // all the failed offsets, that are earlier than actual EarliestTime
+                // offset, since they are anyway not there.
+                // These calls to broker API will be then saved.
+                Set<Long> omitted = this._failedMsgRetryManager.clearInvalidMessages(_emittedToOffset);
+                
+                LOG.warn("Removing the failed offsets that are out of range: {}", omitted);
+            }
+            
             return;
         }
         long end = System.nanoTime();

http://git-wip-us.apache.org/repos/asf/storm/blob/f091743d/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
index 3dd8b38..11ad5b7 100644
--- a/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
@@ -29,6 +29,8 @@ public class ExponentialBackoffMsgRetryManagerTest {
 
     private static final Long TEST_OFFSET = 101L;
     private static final Long TEST_OFFSET2 = 102L;
+    private static final Long TEST_OFFSET3 = 105L;
+    private static final Long TEST_NEW_OFFSET = 103L;
 
     @Test
     public void testImmediateRetry() throws Exception {
@@ -208,4 +210,26 @@ public class ExponentialBackoffMsgRetryManagerTest {
         assertEquals("expect test offset next available for retry", TEST_OFFSET, next);
         assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
     }
-}
\ No newline at end of file
+    
+    @Test
+    public void testClearInvalidMessages() throws Exception {
+        ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        manager.failed(TEST_OFFSET);
+        manager.failed(TEST_OFFSET2);
+        manager.failed(TEST_OFFSET3);
+        
+        assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET));
+        assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2));
+        assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET3));
+
+        manager.clearInvalidMessages(TEST_NEW_OFFSET);
+
+        Long next = manager.nextFailedMessageToRetry();
+        assertEquals("expect test offset next available for retry", TEST_OFFSET3, next);
+        
+        manager.acked(TEST_OFFSET3);
+        next = manager.nextFailedMessageToRetry();
+        assertNull("expect no message ready after acked", next);
+    }
+
+}