You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/01/14 17:06:03 UTC

[42/44] storm git commit: fixes from Bobby Evans review

fixes from Bobby Evans review

fix potential long overflow bug
employ a use-case-specific Comparator instead of an ill-advised, general
    Comparable implementation for sorting retry records in the PriorityQueue


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fd066985
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fd066985
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fd066985

Branch: refs/heads/master
Commit: fd066985a74c3140f61a465cfc49c83a6ddfa713
Parents: f99fe8e
Author: Rick Kilgore <ri...@hbo.com>
Authored: Fri Jan 9 13:34:29 2015 -0800
Committer: Rick Kilgore <ri...@hbo.com>
Committed: Fri Jan 9 13:34:29 2015 -0800

----------------------------------------------------------------------
 .../ExponentialBackoffMsgRetryManager.java      | 29 ++++++++++++++------
 1 file changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fd066985/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
index cace9cd..9b2c904 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -17,10 +17,11 @@
  */
 package storm.kafka;
 
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Queue;
-import java.util.SortedMap;
-import java.util.TreeMap;
 
 public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager {
 
@@ -28,8 +29,8 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
     private final double retryDelayMultiplier;
     private final long retryDelayMaxMs;
 
-    private Queue<MessageRetryRecord> waiting = new PriorityQueue<MessageRetryRecord>();
-    private SortedMap<Long,MessageRetryRecord> records = new TreeMap<Long,MessageRetryRecord>();
+    private Queue<MessageRetryRecord> waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
+    private Map<Long,MessageRetryRecord> records = new HashMap<Long,MessageRetryRecord>();
 
     public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double retryDelayMultiplier, long retryDelayMaxMs) {
         this.retryInitialDelayMs = retryInitialDelayMs;
@@ -103,7 +104,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
      *  </li>
      * </ul>
      */
-    class MessageRetryRecord implements Comparable<MessageRetryRecord> {
+    class MessageRetryRecord {
         private final long offset;
         private final int retryNum;
         private final long retryTimeUTC;
@@ -131,7 +132,11 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
 
         private long calculateRetryDelay() {
             double delayMultiplier = Math.pow(retryDelayMultiplier, this.retryNum - 1);
-            long delayThisRetryMs = (long) (retryInitialDelayMs * delayMultiplier);
+            double delay = retryInitialDelayMs * delayMultiplier;
+            Long maxLong = Long.MAX_VALUE;
+            long delayThisRetryMs = delay >= maxLong.doubleValue()
+                                    ?  maxLong
+                                    : (long) delay;
             return Math.min(delayThisRetryMs, retryDelayMaxMs);
         }
 
@@ -145,10 +150,18 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
         public int hashCode() {
             return Long.valueOf(this.offset).hashCode();
         }
+    }
+
+    class RetryTimeComparator implements Comparator<MessageRetryRecord> {
+
+        @Override
+        public int compare(MessageRetryRecord record1, MessageRetryRecord record2) {
+            return Long.compare(record1.retryTimeUTC, record2.retryTimeUTC);
+        }
 
         @Override
-        public int compareTo(MessageRetryRecord other) {
-            return Long.compare(this.retryTimeUTC, other.retryTimeUTC);
+        public boolean equals(Object obj) {
+            return false;
         }
     }
 }