You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/01/06 23:44:19 UTC

[09/11] storm git commit: Revert " STORM-586: TridentKafkaEmitter should catch updateOffsetException."

Revert " STORM-586: TridentKafkaEmitter should catch updateOffsetException."

This reverts commit 65e9f0c814b2cddc772880042259b66194fd6fb7.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b272f35b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b272f35b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b272f35b

Branch: refs/heads/master
Commit: b272f35bed7688427b2bb88904f632ff7cd01d82
Parents: 5c69677
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Fri Dec 12 10:43:06 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Fri Dec 12 10:43:06 2014 -0800

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java  |  7 +++----
 .../src/jvm/storm/kafka/UpdateOffsetException.java        |  5 +----
 .../src/jvm/storm/kafka/trident/TridentKafkaEmitter.java  | 10 +---------
 3 files changed, 5 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b272f35b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 3165189..918da74 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -180,11 +180,10 @@ public class KafkaUtils {
         if (fetchResponse.hasError()) {
             KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
             if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
-                String msg = "Got fetch request with offset out of range: [" + offset + "]; " +
+                LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
                         "retrying with default start offset time from configuration. " +
-                        "configured start offset time: [" + config.startOffsetTime + "]";
-                LOG.warn(msg);
-                throw new UpdateOffsetException(msg);
+                        "configured start offset time: [" + config.startOffsetTime + "]");
+                throw new UpdateOffsetException();
             } else {
                 String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
                 LOG.error(message);

http://git-wip-us.apache.org/repos/asf/storm/blob/b272f35b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
index 5c366ec..1be7312 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
@@ -17,9 +17,6 @@
  */
 package storm.kafka;
 
-public class UpdateOffsetException extends FailedFetchException {
+public class UpdateOffsetException extends RuntimeException {
 
-    public UpdateOffsetException(String message) {
-        super(message);
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b272f35b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 34566c5..94bf134 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -33,7 +33,6 @@ import storm.kafka.DynamicPartitionConnections;
 import storm.kafka.FailedFetchException;
 import storm.kafka.KafkaUtils;
 import storm.kafka.Partition;
-import storm.kafka.UpdateOffsetException;
 import storm.trident.operation.TridentCollector;
 import storm.trident.spout.IOpaquePartitionedTridentSpout;
 import storm.trident.spout.IPartitionedTridentSpout;
@@ -130,14 +129,7 @@ public class TridentKafkaEmitter {
 
     private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
         long start = System.nanoTime();
-        ByteBufferMessageSet msgs = null;
-        try {
-            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
-        } catch (UpdateOffsetException e) {
-            long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
-            LOG.warn("OffsetOutOfRange, Updating offset from offset = " + offset + " to offset = " + newOffset);
-            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, newOffset);
-        }
+        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
         long end = System.nanoTime();
         long millis = (end - start) / 1000000;
         _kafkaMeanFetchLatencyMetric.update(millis);