You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/01/06 17:40:54 UTC

[07/11] storm git commit: Forcing the startTime = earliestTime when the offset is outOfRange.

Forcing the startTime = earliestTime when the offset is outOfRange.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a7c8f25f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a7c8f25f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a7c8f25f

Branch: refs/heads/master
Commit: a7c8f25f3032db7658ea12f00acfdea3dfd383fc
Parents: cb548dd
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Thu Dec 18 10:11:07 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Thu Dec 18 10:11:17 2014 -0800

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java     | 2 +-
 .../src/jvm/storm/kafka/trident/TridentKafkaEmitter.java           | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a7c8f25f/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 3f9e410..54a61f4 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -159,7 +159,7 @@ public class PartitionManager {
         try {
             msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
         } catch (TopicOffsetOutOfRangeException e) {
-            _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig);
+            _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
             LOG.warn("Using new offset: {}", _emittedToOffset);
             // fetch failed, so don't update the metrics
             return;

http://git-wip-us.apache.org/repos/asf/storm/blob/a7c8f25f/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 727ff8d..db4299a 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -113,7 +113,7 @@ public class TridentKafkaEmitter {
         try {
             msgs = fetchMessages(consumer, partition, offset);
         } catch (TopicOffsetOutOfRangeException e) {
-            long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
+            long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
             LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset);
             offset = newOffset;
             msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);