You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/01/14 17:05:36 UTC
[15/44] storm git commit: do the time calc only once
do the time calc only once
do the due time calculation for a MessageRetryRecord when one is
created, not every time isReadyForRetry() is calculated
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2aa03468
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2aa03468
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2aa03468
Branch: refs/heads/master
Commit: 2aa03468db3971a11d07048628908074f2a0d3b3
Parents: 79e3efe
Author: Rick Kilgore <ri...@hbo.com>
Authored: Tue Sep 9 16:03:34 2014 -0700
Committer: Rick Kilgore <ri...@hbo.com>
Committed: Tue Sep 9 16:03:34 2014 -0700
----------------------------------------------------------------------
.../src/jvm/storm/kafka/PartitionManager.java | 33 +++++++++++---------
1 file changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2aa03468/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 02edf31..c66545e 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -223,7 +223,9 @@ public class PartitionManager {
LOG.debug("failing at offset=" + offset + " with _pending.size()=" + _pending.size() + " pending and _emittedToOffset=" + _emittedToOffset);
failed.add(offset);
MessageRetryRecord retryRecord = retryRecords.get(offset);
- retryRecords.put(offset, retryRecord == null ? new MessageRetryRecord() : retryRecord.retryAgainRecord());
+ retryRecords.put(offset, retryRecord == null
+ ? new MessageRetryRecord()
+ : retryRecord.createNextRetryRecord());
numberFailed++;
if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) {
throw new RuntimeException("Too many tuple failures");
@@ -296,27 +298,30 @@ public class PartitionManager {
* </ul>
*/
class MessageRetryRecord {
- private final long failTimeUTC;
- private final int attemptsAlreadyPerformed;
-
- private MessageRetryRecord(int attemptsAlreadyPerformed) {
- this.failTimeUTC = new Date().getTime();
- this.attemptsAlreadyPerformed = attemptsAlreadyPerformed;
- }
+ private final int retryNum;
+ private final long retryTimeUTC;
public MessageRetryRecord() {
this(1);
}
- public MessageRetryRecord retryAgainRecord() {
- return new MessageRetryRecord(this.attemptsAlreadyPerformed + 1);
+ private MessageRetryRecord(int retryNum) {
+ this.retryNum = retryNum;
+ this.retryTimeUTC = new Date().getTime() + calculateRetryDelay(this.retryNum);
}
- public boolean isReadyForRetry() {
- double delayMultiplier = Math.pow(_spoutConfig.retryDelayMultiplier, this.attemptsAlreadyPerformed - 1);
+ public MessageRetryRecord createNextRetryRecord() {
+ return new MessageRetryRecord(this.retryNum + 1);
+ }
+
+ private long calculateRetryDelay(int retryNum) {
+ double delayMultiplier = Math.pow(_spoutConfig.retryDelayMultiplier, retryNum - 1);
long delayThisRetryMs = (long) (_spoutConfig.retryInitialDelayMs * delayMultiplier);
- delayThisRetryMs = Math.min(delayThisRetryMs, _spoutConfig.retryDelayMaxMs);
- return new Date().getTime() - this.failTimeUTC > delayThisRetryMs;
+ return Math.min(delayThisRetryMs, _spoutConfig.retryDelayMaxMs);
+ }
+
+ public boolean isReadyForRetry() {
+ return new Date().getTime() > this.retryTimeUTC;
}
}
}