You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/01/06 17:40:50 UTC

[03/11] storm git commit: Renaming UpdateOffsetException to TopicOffsetOutOfRangeException.

Renaming UpdateOffsetException to TopicOffsetOutOfRangeException.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b2f48b41
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b2f48b41
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b2f48b41

Branch: refs/heads/master
Commit: b2f48b41f19398498c7ae41c2059f3685b87ac22
Parents: 86839dc
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Mon Dec 8 15:06:26 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Mon Dec 8 15:06:26 2014 -0800

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/KafkaUtils.java         |  4 ++--
 .../src/jvm/storm/kafka/PartitionManager.java   |  4 +---
 .../kafka/TopicOffsetOutOfRangeException.java   | 25 ++++++++++++++++++++
 .../jvm/storm/kafka/UpdateOffsetException.java  | 25 --------------------
 .../kafka/trident/TridentKafkaEmitter.java      | 11 ++++-----
 .../src/test/storm/kafka/KafkaUtilsTest.java    |  2 +-
 6 files changed, 33 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b2f48b41/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 9772c0d..b858639 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -155,7 +155,7 @@ public class KafkaUtils {
         }
     }
 
-    public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) throws UpdateOffsetException {
+    public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) throws TopicOffsetOutOfRangeException {
         ByteBufferMessageSet msgs = null;
         String topic = config.topic;
         int partitionId = partition.partition;
@@ -182,7 +182,7 @@ public class KafkaUtils {
             if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
                 String msg = "Got fetch request with offset out of range: [" + offset + "]";
                 LOG.warn(msg);
-                throw new UpdateOffsetException(msg);
+                throw new TopicOffsetOutOfRangeException(msg);
             } else {
                 String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
                 LOG.error(message);

http://git-wip-us.apache.org/repos/asf/storm/blob/b2f48b41/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index d24a49e..3f9e410 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -23,11 +23,9 @@ import backtype.storm.metric.api.CountMetric;
 import backtype.storm.metric.api.MeanReducer;
 import backtype.storm.metric.api.ReducedMetric;
 import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.utils.Utils;
 import com.google.common.collect.ImmutableMap;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.ByteBufferMessageSet$;
 import kafka.message.MessageAndOffset;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -160,7 +158,7 @@ public class PartitionManager {
         ByteBufferMessageSet msgs = null;
         try {
             msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
-        } catch (UpdateOffsetException e) {
+        } catch (TopicOffsetOutOfRangeException e) {
             _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig);
             LOG.warn("Using new offset: {}", _emittedToOffset);
             // fetch failed, so don't update the metrics

http://git-wip-us.apache.org/repos/asf/storm/blob/b2f48b41/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java b/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java
new file mode 100644
index 0000000..2e6d2f5
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+public class TopicOffsetOutOfRangeException extends FailedFetchException {
+
+    public TopicOffsetOutOfRangeException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b2f48b41/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
deleted file mode 100644
index 5c366ec..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-public class UpdateOffsetException extends FailedFetchException {
-
-    public UpdateOffsetException(String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/b2f48b41/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 90d7f75..727ff8d 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -29,11 +29,8 @@ import kafka.message.Message;
 import kafka.message.MessageAndOffset;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import storm.kafka.DynamicPartitionConnections;
-import storm.kafka.FailedFetchException;
-import storm.kafka.KafkaUtils;
-import storm.kafka.Partition;
-import storm.kafka.UpdateOffsetException;
+import storm.kafka.*;
+import storm.kafka.TopicOffsetOutOfRangeException;
 import storm.trident.operation.TridentCollector;
 import storm.trident.spout.IOpaquePartitionedTridentSpout;
 import storm.trident.spout.IPartitionedTridentSpout;
@@ -115,7 +112,7 @@ public class TridentKafkaEmitter {
         ByteBufferMessageSet msgs = null;
         try {
             msgs = fetchMessages(consumer, partition, offset);
-        } catch (UpdateOffsetException e) {
+        } catch (TopicOffsetOutOfRangeException e) {
             long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
             LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset);
             offset = newOffset;
@@ -167,7 +164,7 @@ public class TridentKafkaEmitter {
             ByteBufferMessageSet msgs = null;
             try {
                 msgs = fetchMessages(consumer, partition, offset);
-            } catch (UpdateOffsetException e) {
+            } catch (TopicOffsetOutOfRangeException e) {
                 LOG.warn("OffsetOutOfRange during reEmitPartitionBatch, the transaction can not be replayed." +
                         "Returning empty messages");
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/b2f48b41/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
index ae2fe6c..1f1bbbc 100644
--- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
@@ -99,7 +99,7 @@ public class KafkaUtilsTest {
                 new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), -99);
     }
 
-    @Test(expected = UpdateOffsetException.class)
+    @Test(expected = TopicOffsetOutOfRangeException.class)
     public void fetchMessagesWithInvalidOffsetAndDefaultHandlingEnabled() throws Exception {
         config = new KafkaConfig(brokerHosts, "newTopic");
         String value = "test";