You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2017/02/08 20:51:22 UTC
[5/6] storm git commit: Merge branch 'cyz-dev' of
github.com:danny0405/storm into 1.x-branch
Merge branch 'cyz-dev' of github.com:danny0405/storm into 1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e372489c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e372489c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e372489c
Branch: refs/heads/1.x-branch
Commit: e372489c0fea259a5b2de4d42bc665593326ed8e
Parents: 657dd88 cf41f31
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Feb 8 15:14:43 2017 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Feb 8 15:14:43 2017 -0500
----------------------------------------------------------------------
.../src/jvm/org/apache/storm/kafka/PartitionManager.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e372489c/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index 47b2dd2,40f664f..928a563
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@@ -232,9 -199,14 +232,14 @@@ public class PartitionManager
try {
msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
} catch (TopicOffsetOutOfRangeException e) {
- offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
+ long partitionLatestOffset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.LatestTime());
+ if (partitionLatestOffset < offset) {
+ offset = partitionLatestOffset;
+ } else {
+ offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
+ }
// fetch failed, so don't update the fetch metrics
-
+
//fix bug [STORM-643] : remove outdated failed offsets
if (!processingNewTuples) {
// For the case of EarliestTime it would be better to discard