You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/01/14 17:05:36 UTC

[15/44] storm git commit: do the time calc only once

do the time calc only once

do the due time calculation for a MessageRetryRecord when one is
created, not every time isReadyForRetry() is calculated


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2aa03468
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2aa03468
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2aa03468

Branch: refs/heads/master
Commit: 2aa03468db3971a11d07048628908074f2a0d3b3
Parents: 79e3efe
Author: Rick Kilgore <ri...@hbo.com>
Authored: Tue Sep 9 16:03:34 2014 -0700
Committer: Rick Kilgore <ri...@hbo.com>
Committed: Tue Sep 9 16:03:34 2014 -0700

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/PartitionManager.java   | 33 +++++++++++---------
 1 file changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2aa03468/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 02edf31..c66545e 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -223,7 +223,9 @@ public class PartitionManager {
             LOG.debug("failing at offset=" + offset + " with _pending.size()=" + _pending.size() + " pending and _emittedToOffset=" + _emittedToOffset);
             failed.add(offset);
             MessageRetryRecord retryRecord = retryRecords.get(offset);
-            retryRecords.put(offset, retryRecord == null ? new MessageRetryRecord() : retryRecord.retryAgainRecord());
+            retryRecords.put(offset, retryRecord == null
+                                     ? new MessageRetryRecord()
+                                     : retryRecord.createNextRetryRecord());
             numberFailed++;
             if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) {
                 throw new RuntimeException("Too many tuple failures");
@@ -296,27 +298,30 @@ public class PartitionManager {
      * </ul>
      */
     class MessageRetryRecord {
-        private final long failTimeUTC;
-        private final int attemptsAlreadyPerformed;
-
-        private MessageRetryRecord(int attemptsAlreadyPerformed) {
-            this.failTimeUTC = new Date().getTime();
-            this.attemptsAlreadyPerformed = attemptsAlreadyPerformed;
-        }
+        private final int retryNum;
+        private final long retryTimeUTC;
 
         public MessageRetryRecord() {
             this(1);
         }
 
-        public MessageRetryRecord retryAgainRecord() {
-            return new MessageRetryRecord(this.attemptsAlreadyPerformed + 1);
+        private MessageRetryRecord(int retryNum) {
+            this.retryNum = retryNum;
+            this.retryTimeUTC = new Date().getTime() + calculateRetryDelay(this.retryNum);
         }
 
-        public boolean isReadyForRetry() {
-            double delayMultiplier = Math.pow(_spoutConfig.retryDelayMultiplier, this.attemptsAlreadyPerformed - 1);
+        public MessageRetryRecord createNextRetryRecord() {
+            return new MessageRetryRecord(this.retryNum + 1);
+        }
+
+        private long calculateRetryDelay(int retryNum) {
+            double delayMultiplier = Math.pow(_spoutConfig.retryDelayMultiplier, retryNum - 1);
             long delayThisRetryMs = (long) (_spoutConfig.retryInitialDelayMs * delayMultiplier);
-            delayThisRetryMs = Math.min(delayThisRetryMs, _spoutConfig.retryDelayMaxMs);
-            return new Date().getTime() - this.failTimeUTC > delayThisRetryMs;
+            return Math.min(delayThisRetryMs, _spoutConfig.retryDelayMaxMs);
+        }
+
+        public boolean isReadyForRetry() {
+            return new Date().getTime() > this.retryTimeUTC;
         }
     }
 }