You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/01/14 17:05:55 UTC
[34/44] storm git commit: stop if retries hit
TOPOLOGY_MESSAGE_TIMEOUT_SECS
stop if retries hit TOPOLOGY_MESSAGE_TIMEOUT_SECS
print a loud error message with stack trace if the retries run up
against the TOPOLOGY_MESSAGE_TIMEOUT_SECS timeout, and stop retrying
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/882dfc5f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/882dfc5f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/882dfc5f
Branch: refs/heads/master
Commit: 882dfc5fb7b3b26a17e4a1ee5f91220cd80fc4dd
Parents: 3c699e3
Author: Rick Kilgore <ri...@hbo.com>
Authored: Thu Oct 2 23:12:01 2014 -0700
Committer: Rick Kilgore <ri...@hbo.com>
Committed: Thu Oct 2 23:12:01 2014 -0700
----------------------------------------------------------------------
.../src/jvm/storm/kafka/PartitionManager.java | 96 +++++++++++++++++---
1 file changed, 81 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/882dfc5f/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 6b0930b..cff6df0 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -38,13 +38,24 @@ import java.util.*;
public class PartitionManager {
public static final Logger LOG = LoggerFactory.getLogger(PartitionManager.class);
+ private static final String TIMES_UP_MSG =
+ "Retry logic in your topology is taking longer to complete than is allowed by your"
+ +" Storm Config setting TOPOLOGY_MESSAGE_TIMEOUT_SECS (%s seconds). (i.e., you have"
+ +" called OutputCollector.fail() too many times for this message). KafkaSpout has"
+ +" aborted next retry attempt (retry %s) for the Kafka message at offset %s since it"
+ +" would occur after this timeout.";
+ private static final long TIMEOUT_RESET_VALUE = -1L;
+
private final CombinedMetric _fetchAPILatencyMax;
private final ReducedMetric _fetchAPILatencyMean;
private final CountMetric _fetchAPICallCount;
private final CountMetric _fetchAPIMessageCount;
Long _emittedToOffset;
- SortedSet<Long> _pending = new TreeSet<Long>();
+ // _pending key = Kafka offset, value = time at which the message was first submitted to the topology
+ private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>();
private SortedSet<Long> failed = new TreeSet<Long>();
+
+ // retryRecords key = Kafka offset, value = retry info for the given message
private Map<Long,MessageRetryRecord> retryRecords = new HashMap<Long,MessageRetryRecord>();
Long _committedTo;
LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>();
@@ -190,7 +201,9 @@ public class PartitionManager {
}
if (!had_failed || failedReady.contains(cur_offset)) {
numMessages += 1;
- _pending.add(cur_offset);
+ if (!_pending.containsKey(cur_offset)) {
+ _pending.put(cur_offset, System.currentTimeMillis());
+ }
_waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset));
_emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
if (had_failed) {
@@ -203,9 +216,9 @@ public class PartitionManager {
}
public void ack(Long offset) {
- if (!_pending.isEmpty() && _pending.first() < offset - _spoutConfig.maxOffsetBehind) {
+ if (!_pending.isEmpty() && _pending.firstKey() < offset - _spoutConfig.maxOffsetBehind) {
// Too many things pending!
- _pending.headSet(offset - _spoutConfig.maxOffsetBehind).clear();
+ _pending.headMap(offset - _spoutConfig.maxOffsetBehind).clear();
}
_pending.remove(offset);
retryRecords.remove(offset);
@@ -221,15 +234,23 @@ public class PartitionManager {
);
} else {
LOG.debug("failing at offset=" + offset + " with _pending.size()=" + _pending.size() + " pending and _emittedToOffset=" + _emittedToOffset);
- failed.add(offset);
- MessageRetryRecord retryRecord = retryRecords.get(offset);
- retryRecords.put(offset, retryRecord == null
- ? new MessageRetryRecord()
- : retryRecord.createNextRetryRecord());
numberFailed++;
if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) {
throw new RuntimeException("Too many tuple failures");
}
+
+ try {
+ MessageRetryRecord retryRecord = retryRecords.get(offset);
+ retryRecord = retryRecord == null
+ ? new MessageRetryRecord(offset)
+ : retryRecord.createNextRetryRecord();
+
+ retryRecords.put(offset, retryRecord);
+ failed.add(offset);
+
+ } catch (MessageRetryRecord.AvailableRetryTimeExceededException e) {
+ LOG.error("cannot retry", e);
+ }
}
}
@@ -262,7 +283,7 @@ public class PartitionManager {
if (_pending.isEmpty()) {
return _emittedToOffset;
} else {
- return _pending.first();
+ return _pending.firstKey();
}
}
@@ -298,20 +319,59 @@ public class PartitionManager {
* </ul>
*/
class MessageRetryRecord {
+ private final long offset;
private final int retryNum;
private final long retryTimeUTC;
- public MessageRetryRecord() {
- this(1);
+ public MessageRetryRecord(long offset) throws AvailableRetryTimeExceededException {
+ this(offset, 1);
}
- private MessageRetryRecord(int retryNum) {
+ private MessageRetryRecord(long offset, int retryNum) throws AvailableRetryTimeExceededException {
+ this.offset = offset;
this.retryNum = retryNum;
this.retryTimeUTC = System.currentTimeMillis() + calculateRetryDelay();
+ validateRetryTime();
+ }
+
+ /**
+ * Create a MessageRetryRecord for the next retry that should occur after this one.
+ * @return MessageRetryRecord with the next retry time, or null to indicate that another
+ * retry should not be performed. The latter case can happen if we are about to
+ * run into the backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS in the Storm
+ * configuration.
+ */
+ public MessageRetryRecord createNextRetryRecord() throws AvailableRetryTimeExceededException {
+ return new MessageRetryRecord(this.offset, this.retryNum + 1);
}
- public MessageRetryRecord createNextRetryRecord() {
- return new MessageRetryRecord(this.retryNum + 1);
+ private void validateRetryTime() throws AvailableRetryTimeExceededException {
+ long stormStartTime = PartitionManager.this._pending.get(this.offset);
+
+ if (stormStartTime == TIMEOUT_RESET_VALUE) {
+ // This is a resubmission from the Storm framework after Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
+ // has elapsed. Restart my timer.
+ PartitionManager.this._pending.put(this.offset, System.currentTimeMillis());
+
+ } else {
+ int timeoutSeconds = Utils.getInt(_stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
+ if (this.retryTimeUTC - stormStartTime > timeoutSeconds * 1000) {
+
+ // Prepare for when the Storm framework calls fail()
+ _pending.put(this.offset, TIMEOUT_RESET_VALUE);
+
+ throw new AvailableRetryTimeExceededException(String.format(TIMES_UP_MSG,
+ timeoutSeconds,
+ this.retryNum,
+ this.offset));
+
+ } else {
+ LOG.warn(String.format("allowing another retry: start=%s, retryTime=%s, timeoutSeconds=%s",
+ (stormStartTime / 1000) % 1000,
+ (this.retryTimeUTC / 1000) % 1000,
+ timeoutSeconds));
+ }
+ }
}
private long calculateRetryDelay() {
@@ -323,5 +383,11 @@ public class PartitionManager {
public boolean isReadyForRetry() {
return System.currentTimeMillis() > this.retryTimeUTC;
}
+
+ class AvailableRetryTimeExceededException extends Exception {
+ public AvailableRetryTimeExceededException(String msg) {
+ super(msg);
+ }
+ }
}
}