You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/10/27 21:48:11 UTC
[4/7] git commit: move new offset calculation to PartitionManager; Don't update metrics on failed fetchå
move new offset calculation to PartitionManager; Don't update metrics on failed fetchå
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/67b5f56c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/67b5f56c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/67b5f56c
Branch: refs/heads/master
Commit: 67b5f56c1ff3905c88eb85dbb8985c7bb8342de9
Parents: 1897dee
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Thu Oct 23 16:27:07 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Thu Oct 23 16:27:07 2014 -0400
----------------------------------------------------------------------
external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java | 5 ++---
external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java | 5 ++++-
.../storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java | 5 -----
3 files changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/67b5f56c/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index eab80eb..918da74 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -180,11 +180,10 @@ public class KafkaUtils {
if (fetchResponse.hasError()) {
KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
- long startOffset = getOffset(consumer, topic, partitionId, config.startOffsetTime);
LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
"retrying with default start offset time from configuration. " +
- "configured start offset time: [" + config.startOffsetTime + "] offset: [" + startOffset + "]");
- throw new UpdateOffsetException(startOffset);
+ "configured start offset time: [" + config.startOffsetTime + "]");
+ throw new UpdateOffsetException();
} else {
String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
LOG.error(message);
http://git-wip-us.apache.org/repos/asf/storm/blob/67b5f56c/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index fa5f7e5..d24a49e 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -161,7 +161,10 @@ public class PartitionManager {
try {
msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
} catch (UpdateOffsetException e) {
- _emittedToOffset = e.startOffset;
+ _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig);
+ LOG.warn("Using new offset: {}", _emittedToOffset);
+ // fetch failed, so don't update the metrics
+ return;
}
long end = System.nanoTime();
long millis = (end - start) / 1000000;
http://git-wip-us.apache.org/repos/asf/storm/blob/67b5f56c/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
index 69d8950..510c8cd 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
@@ -2,9 +2,4 @@ package storm.kafka;
public class UpdateOffsetException extends RuntimeException {
- public final Long startOffset;
-
- public UpdateOffsetException(Long _offset) {
- this.startOffset = _offset;
- }
}