You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/10/27 21:48:11 UTC

[4/7] git commit: move new offset calculation to PartitionManager; Don't update metrics on failed fetchå

move new offset calculation to PartitionManager; Don't update metrics on failed fetchå


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/67b5f56c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/67b5f56c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/67b5f56c

Branch: refs/heads/master
Commit: 67b5f56c1ff3905c88eb85dbb8985c7bb8342de9
Parents: 1897dee
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Thu Oct 23 16:27:07 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Thu Oct 23 16:27:07 2014 -0400

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java        | 5 ++---
 external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java  | 5 ++++-
 .../storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java  | 5 -----
 3 files changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/67b5f56c/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index eab80eb..918da74 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -180,11 +180,10 @@ public class KafkaUtils {
         if (fetchResponse.hasError()) {
             KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
             if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
-                long startOffset = getOffset(consumer, topic, partitionId, config.startOffsetTime);
                 LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
                         "retrying with default start offset time from configuration. " +
-                        "configured start offset time: [" + config.startOffsetTime + "] offset: [" + startOffset + "]");
-                throw new UpdateOffsetException(startOffset);
+                        "configured start offset time: [" + config.startOffsetTime + "]");
+                throw new UpdateOffsetException();
             } else {
                 String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
                 LOG.error(message);

http://git-wip-us.apache.org/repos/asf/storm/blob/67b5f56c/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index fa5f7e5..d24a49e 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -161,7 +161,10 @@ public class PartitionManager {
         try {
             msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
         } catch (UpdateOffsetException e) {
-            _emittedToOffset = e.startOffset;
+            _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig);
+            LOG.warn("Using new offset: {}", _emittedToOffset);
+            // fetch failed, so don't update the metrics
+            return;
         }
         long end = System.nanoTime();
         long millis = (end - start) / 1000000;

http://git-wip-us.apache.org/repos/asf/storm/blob/67b5f56c/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
index 69d8950..510c8cd 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
@@ -2,9 +2,4 @@ package storm.kafka;
 
 public class UpdateOffsetException extends RuntimeException {
 
-    public final Long startOffset;
-
-    public UpdateOffsetException(Long _offset) {
-        this.startOffset = _offset;
-    }
 }