You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/01/14 17:06:03 UTC
[42/44] storm git commit: fixes from Bobby Evans review
fixes from Bobby Evans review
fix potential long overflow bug
employ a use-case-specific Comparator instead of an ill-advised, general
Comparable implementation for sorting retry records in the PriorityQueue
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fd066985
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fd066985
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fd066985
Branch: refs/heads/master
Commit: fd066985a74c3140f61a465cfc49c83a6ddfa713
Parents: f99fe8e
Author: Rick Kilgore <ri...@hbo.com>
Authored: Fri Jan 9 13:34:29 2015 -0800
Committer: Rick Kilgore <ri...@hbo.com>
Committed: Fri Jan 9 13:34:29 2015 -0800
----------------------------------------------------------------------
.../ExponentialBackoffMsgRetryManager.java | 29 ++++++++++++++------
1 file changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fd066985/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
index cace9cd..9b2c904 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -17,10 +17,11 @@
*/
package storm.kafka;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
-import java.util.SortedMap;
-import java.util.TreeMap;
public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager {
@@ -28,8 +29,8 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
private final double retryDelayMultiplier;
private final long retryDelayMaxMs;
- private Queue<MessageRetryRecord> waiting = new PriorityQueue<MessageRetryRecord>();
- private SortedMap<Long,MessageRetryRecord> records = new TreeMap<Long,MessageRetryRecord>();
+ private Queue<MessageRetryRecord> waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
+ private Map<Long,MessageRetryRecord> records = new HashMap<Long,MessageRetryRecord>();
public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double retryDelayMultiplier, long retryDelayMaxMs) {
this.retryInitialDelayMs = retryInitialDelayMs;
@@ -103,7 +104,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
* </li>
* </ul>
*/
- class MessageRetryRecord implements Comparable<MessageRetryRecord> {
+ class MessageRetryRecord {
private final long offset;
private final int retryNum;
private final long retryTimeUTC;
@@ -131,7 +132,11 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
private long calculateRetryDelay() {
double delayMultiplier = Math.pow(retryDelayMultiplier, this.retryNum - 1);
- long delayThisRetryMs = (long) (retryInitialDelayMs * delayMultiplier);
+ double delay = retryInitialDelayMs * delayMultiplier;
+ Long maxLong = Long.MAX_VALUE;
+ long delayThisRetryMs = delay >= maxLong.doubleValue()
+ ? maxLong
+ : (long) delay;
return Math.min(delayThisRetryMs, retryDelayMaxMs);
}
@@ -145,10 +150,18 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
public int hashCode() {
return Long.valueOf(this.offset).hashCode();
}
+ }
+
+ class RetryTimeComparator implements Comparator<MessageRetryRecord> {
+
+ @Override
+ public int compare(MessageRetryRecord record1, MessageRetryRecord record2) {
+ return Long.compare(record1.retryTimeUTC, record2.retryTimeUTC);
+ }
@Override
- public int compareTo(MessageRetryRecord other) {
- return Long.compare(this.retryTimeUTC, other.retryTimeUTC);
+ public boolean equals(Object obj) {
+ return false;
}
}
}