You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/01/06 17:40:54 UTC
[07/11] storm git commit: Forcing the startTime = earliestTime when
the offset is outOfRange.
Forcing the startTime = earliestTime when the offset is outOfRange.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a7c8f25f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a7c8f25f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a7c8f25f
Branch: refs/heads/master
Commit: a7c8f25f3032db7658ea12f00acfdea3dfd383fc
Parents: cb548dd
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Thu Dec 18 10:11:07 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Thu Dec 18 10:11:17 2014 -0800
----------------------------------------------------------------------
external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java | 2 +-
.../src/jvm/storm/kafka/trident/TridentKafkaEmitter.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a7c8f25f/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 3f9e410..54a61f4 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -159,7 +159,7 @@ public class PartitionManager {
try {
msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
} catch (TopicOffsetOutOfRangeException e) {
- _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig);
+ _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
LOG.warn("Using new offset: {}", _emittedToOffset);
// fetch failed, so don't update the metrics
return;
http://git-wip-us.apache.org/repos/asf/storm/blob/a7c8f25f/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 727ff8d..db4299a 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -113,7 +113,7 @@ public class TridentKafkaEmitter {
try {
msgs = fetchMessages(consumer, partition, offset);
} catch (TopicOffsetOutOfRangeException e) {
- long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
+ long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset);
offset = newOffset;
msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);