You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/02/23 23:22:00 UTC

[05/21] storm git commit: Revert "STORM-616 : removing unintended changes."

Revert "STORM-616 : removing unintended changes."

This reverts commit d260759ac203383e27668a7cb7090926029f7406.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ca235e6c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ca235e6c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ca235e6c

Branch: refs/heads/master
Commit: ca235e6cb18006bbbac56361639309e73c196718
Parents: 079deda
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Tue Jan 6 17:43:58 2015 -0500
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Tue Jan 6 17:43:58 2015 -0500

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java  |  7 ++++---
 .../src/jvm/storm/kafka/UpdateOffsetException.java        |  5 ++++-
 .../src/jvm/storm/kafka/trident/TridentKafkaEmitter.java  | 10 +++++++++-
 3 files changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ca235e6c/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 918da74..3165189 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -180,10 +180,11 @@ public class KafkaUtils {
         if (fetchResponse.hasError()) {
             KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
             if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
-                LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
+                String msg = "Got fetch request with offset out of range: [" + offset + "]; " +
                         "retrying with default start offset time from configuration. " +
-                        "configured start offset time: [" + config.startOffsetTime + "]");
-                throw new UpdateOffsetException();
+                        "configured start offset time: [" + config.startOffsetTime + "]";
+                LOG.warn(msg);
+                throw new UpdateOffsetException(msg);
             } else {
                 String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
                 LOG.error(message);

http://git-wip-us.apache.org/repos/asf/storm/blob/ca235e6c/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
index 1be7312..5c366ec 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
@@ -17,6 +17,9 @@
  */
 package storm.kafka;
 
-public class UpdateOffsetException extends RuntimeException {
+public class UpdateOffsetException extends FailedFetchException {
 
+    public UpdateOffsetException(String message) {
+        super(message);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ca235e6c/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 94bf134..34566c5 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -33,6 +33,7 @@ import storm.kafka.DynamicPartitionConnections;
 import storm.kafka.FailedFetchException;
 import storm.kafka.KafkaUtils;
 import storm.kafka.Partition;
+import storm.kafka.UpdateOffsetException;
 import storm.trident.operation.TridentCollector;
 import storm.trident.spout.IOpaquePartitionedTridentSpout;
 import storm.trident.spout.IPartitionedTridentSpout;
@@ -129,7 +130,14 @@ public class TridentKafkaEmitter {
 
     private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
         long start = System.nanoTime();
-        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
+        ByteBufferMessageSet msgs = null;
+        try {
+            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
+        } catch (UpdateOffsetException e) {
+            long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
+            LOG.warn("OffsetOutOfRange, Updating offset from offset = " + offset + " to offset = " + newOffset);
+            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, newOffset);
+        }
         long end = System.nanoTime();
         long millis = (end - start) / 1000000;
         _kafkaMeanFetchLatencyMetric.update(millis);