You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2017/02/08 20:51:22 UTC

[5/6] storm git commit: Merge branch 'cyz-dev' of github.com:danny0405/storm into 1.x-branch

Merge branch 'cyz-dev' of github.com:danny0405/storm into 1.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e372489c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e372489c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e372489c

Branch: refs/heads/1.x-branch
Commit: e372489c0fea259a5b2de4d42bc665593326ed8e
Parents: 657dd88 cf41f31
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Feb 8 15:14:43 2017 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Feb 8 15:14:43 2017 -0500

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/kafka/PartitionManager.java      | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e372489c/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index 47b2dd2,40f664f..928a563
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@@ -232,9 -199,14 +232,14 @@@ public class PartitionManager 
          try {
              msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
          } catch (TopicOffsetOutOfRangeException e) {
-             offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
+             long partitionLatestOffset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.LatestTime());
+             if (partitionLatestOffset < offset) {
+                 offset = partitionLatestOffset;
+             } else {
+                 offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
+             }
              // fetch failed, so don't update the fetch metrics
 -            
 +
              //fix bug [STORM-643] : remove outdated failed offsets
              if (!processingNewTuples) {
                  // For the case of EarliestTime it would be better to discard