You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/01/14 17:05:55 UTC

[34/44] storm git commit: stop if retries hit TOPOLOGY_MESSAGE_TIMEOUT_SECS

stop if retries hit TOPOLOGY_MESSAGE_TIMEOUT_SECS

print a loud error message with stack trace if the retries run up
against the TOPOLOGY_MESSAGE_TIMEOUT_SECS timeout, and stop retrying


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/882dfc5f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/882dfc5f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/882dfc5f

Branch: refs/heads/master
Commit: 882dfc5fb7b3b26a17e4a1ee5f91220cd80fc4dd
Parents: 3c699e3
Author: Rick Kilgore <ri...@hbo.com>
Authored: Thu Oct 2 23:12:01 2014 -0700
Committer: Rick Kilgore <ri...@hbo.com>
Committed: Thu Oct 2 23:12:01 2014 -0700

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/PartitionManager.java   | 96 +++++++++++++++++---
 1 file changed, 81 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/882dfc5f/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 6b0930b..cff6df0 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -38,13 +38,24 @@ import java.util.*;
 
 public class PartitionManager {
     public static final Logger LOG = LoggerFactory.getLogger(PartitionManager.class);
+    private static final String TIMES_UP_MSG =
+            "Retry logic in your topology is taking longer to complete than is allowed by your"
+            +" Storm Config setting TOPOLOGY_MESSAGE_TIMEOUT_SECS (%s seconds).  (i.e., you have"
+            +" called OutputCollector.fail() too many times for this message).  KafkaSpout has"
+            +" aborted next retry attempt (retry %s) for the Kafka message at offset %s since it"
+            +" would occur after this timeout.";
+    private static final long TIMEOUT_RESET_VALUE = -1L;
+
     private final CombinedMetric _fetchAPILatencyMax;
     private final ReducedMetric _fetchAPILatencyMean;
     private final CountMetric _fetchAPICallCount;
     private final CountMetric _fetchAPIMessageCount;
     Long _emittedToOffset;
-    SortedSet<Long> _pending = new TreeSet<Long>();
+    // _pending key = Kafka offset, value = time at which the message was first submitted to the topology
+    private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>();
     private SortedSet<Long> failed = new TreeSet<Long>();
+
+    // retryRecords key = Kafka offset, value = retry info for the given message
     private Map<Long,MessageRetryRecord> retryRecords = new HashMap<Long,MessageRetryRecord>();
     Long _committedTo;
     LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>();
@@ -190,7 +201,9 @@ public class PartitionManager {
                 }
                 if (!had_failed || failedReady.contains(cur_offset)) {
                     numMessages += 1;
-                    _pending.add(cur_offset);
+                    if (!_pending.containsKey(cur_offset)) {
+                        _pending.put(cur_offset, System.currentTimeMillis());
+                    }
                     _waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset));
                     _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
                     if (had_failed) {
@@ -203,9 +216,9 @@ public class PartitionManager {
     }
 
     public void ack(Long offset) {
-        if (!_pending.isEmpty() && _pending.first() < offset - _spoutConfig.maxOffsetBehind) {
+        if (!_pending.isEmpty() && _pending.firstKey() < offset - _spoutConfig.maxOffsetBehind) {
             // Too many things pending!
-            _pending.headSet(offset - _spoutConfig.maxOffsetBehind).clear();
+            _pending.headMap(offset - _spoutConfig.maxOffsetBehind).clear();
         }
         _pending.remove(offset);
         retryRecords.remove(offset);
@@ -221,15 +234,23 @@ public class PartitionManager {
             );
         } else {
             LOG.debug("failing at offset=" + offset + " with _pending.size()=" + _pending.size() + " pending and _emittedToOffset=" + _emittedToOffset);
-            failed.add(offset);
-            MessageRetryRecord retryRecord = retryRecords.get(offset);
-            retryRecords.put(offset, retryRecord == null
-                                     ? new MessageRetryRecord()
-                                     : retryRecord.createNextRetryRecord());
             numberFailed++;
             if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) {
                 throw new RuntimeException("Too many tuple failures");
             }
+
+            try {
+                MessageRetryRecord retryRecord = retryRecords.get(offset);
+                retryRecord = retryRecord == null
+                              ? new MessageRetryRecord(offset)
+                              : retryRecord.createNextRetryRecord();
+
+                retryRecords.put(offset, retryRecord);
+                failed.add(offset);
+
+            } catch (MessageRetryRecord.AvailableRetryTimeExceededException e) {
+                LOG.error("cannot retry", e);
+            }
         }
     }
 
@@ -262,7 +283,7 @@ public class PartitionManager {
         if (_pending.isEmpty()) {
             return _emittedToOffset;
         } else {
-            return _pending.first();
+            return _pending.firstKey();
         }
     }
 
@@ -298,20 +319,59 @@ public class PartitionManager {
      * </ul>
      */
     class MessageRetryRecord {
+        private final long offset;
         private final int retryNum;
         private final long retryTimeUTC;
 
-        public MessageRetryRecord() {
-            this(1);
+        public MessageRetryRecord(long offset) throws AvailableRetryTimeExceededException {
+            this(offset, 1);
         }
 
-        private MessageRetryRecord(int retryNum) {
+        private MessageRetryRecord(long offset, int retryNum) throws AvailableRetryTimeExceededException {
+            this.offset = offset;
             this.retryNum = retryNum;
             this.retryTimeUTC = System.currentTimeMillis() + calculateRetryDelay();
+            validateRetryTime();
+        }
+
+        /**
+         * Create a MessageRetryRecord for the next retry that should occur after this one.
+         * @return MessageRetryRecord with the next retry time, or null to indicate that another
+         *         retry should not be performed.  The latter case can happen if we are about to
+         *         run into the backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS in the Storm
+         *         configuration.
+         */
+        public MessageRetryRecord createNextRetryRecord() throws AvailableRetryTimeExceededException {
+            return new MessageRetryRecord(this.offset, this.retryNum + 1);
         }
 
-        public MessageRetryRecord createNextRetryRecord() {
-            return new MessageRetryRecord(this.retryNum + 1);
+        private void validateRetryTime() throws AvailableRetryTimeExceededException {
+            long stormStartTime = PartitionManager.this._pending.get(this.offset);
+
+            if (stormStartTime == TIMEOUT_RESET_VALUE) {
+                // This is a resubmission from the Storm framework after Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
+                // has elapsed.  Restart my timer.
+                PartitionManager.this._pending.put(this.offset, System.currentTimeMillis());
+
+            } else {
+                int timeoutSeconds = Utils.getInt(_stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
+                if (this.retryTimeUTC - stormStartTime > timeoutSeconds * 1000) {
+
+                    // Prepare for when the Storm framework calls fail()
+                    _pending.put(this.offset, TIMEOUT_RESET_VALUE);
+
+                    throw new AvailableRetryTimeExceededException(String.format(TIMES_UP_MSG,
+                                                                                timeoutSeconds,
+                                                                                this.retryNum,
+                                                                                this.offset));
+
+                } else {
+                    LOG.warn(String.format("allowing another retry: start=%s, retryTime=%s, timeoutSeconds=%s",
+                                           (stormStartTime / 1000) % 1000,
+                                           (this.retryTimeUTC / 1000) % 1000,
+                                           timeoutSeconds));
+                }
+            }
         }
 
         private long calculateRetryDelay() {
@@ -323,5 +383,11 @@ public class PartitionManager {
         public boolean isReadyForRetry() {
             return System.currentTimeMillis() > this.retryTimeUTC;
         }
+
+        class AvailableRetryTimeExceededException extends Exception {
+            public AvailableRetryTimeExceededException(String msg) {
+                super(msg);
+            }
+        }
     }
 }