You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/01/06 17:40:50 UTC
[03/11] storm git commit: Renaming UpdateOffsetException to
TopicOffsetOutOfRangeException.
Renaming UpdateOffsetException to TopicOffsetOutOfRangeException.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b2f48b41
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b2f48b41
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b2f48b41
Branch: refs/heads/master
Commit: b2f48b41f19398498c7ae41c2059f3685b87ac22
Parents: 86839dc
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Mon Dec 8 15:06:26 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Mon Dec 8 15:06:26 2014 -0800
----------------------------------------------------------------------
.../src/jvm/storm/kafka/KafkaUtils.java | 4 ++--
.../src/jvm/storm/kafka/PartitionManager.java | 4 +---
.../kafka/TopicOffsetOutOfRangeException.java | 25 ++++++++++++++++++++
.../jvm/storm/kafka/UpdateOffsetException.java | 25 --------------------
.../kafka/trident/TridentKafkaEmitter.java | 11 ++++-----
.../src/test/storm/kafka/KafkaUtilsTest.java | 2 +-
6 files changed, 33 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b2f48b41/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 9772c0d..b858639 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -155,7 +155,7 @@ public class KafkaUtils {
}
}
- public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) throws UpdateOffsetException {
+ public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) throws TopicOffsetOutOfRangeException {
ByteBufferMessageSet msgs = null;
String topic = config.topic;
int partitionId = partition.partition;
@@ -182,7 +182,7 @@ public class KafkaUtils {
if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
String msg = "Got fetch request with offset out of range: [" + offset + "]";
LOG.warn(msg);
- throw new UpdateOffsetException(msg);
+ throw new TopicOffsetOutOfRangeException(msg);
} else {
String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
LOG.error(message);
http://git-wip-us.apache.org/repos/asf/storm/blob/b2f48b41/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index d24a49e..3f9e410 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -23,11 +23,9 @@ import backtype.storm.metric.api.CountMetric;
import backtype.storm.metric.api.MeanReducer;
import backtype.storm.metric.api.ReducedMetric;
import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.utils.Utils;
import com.google.common.collect.ImmutableMap;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.ByteBufferMessageSet$;
import kafka.message.MessageAndOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -160,7 +158,7 @@ public class PartitionManager {
ByteBufferMessageSet msgs = null;
try {
msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
- } catch (UpdateOffsetException e) {
+ } catch (TopicOffsetOutOfRangeException e) {
_emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig);
LOG.warn("Using new offset: {}", _emittedToOffset);
// fetch failed, so don't update the metrics
http://git-wip-us.apache.org/repos/asf/storm/blob/b2f48b41/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java b/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java
new file mode 100644
index 0000000..2e6d2f5
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+public class TopicOffsetOutOfRangeException extends FailedFetchException {
+
+ public TopicOffsetOutOfRangeException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/b2f48b41/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
deleted file mode 100644
index 5c366ec..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-public class UpdateOffsetException extends FailedFetchException {
-
- public UpdateOffsetException(String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/b2f48b41/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 90d7f75..727ff8d 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -29,11 +29,8 @@ import kafka.message.Message;
import kafka.message.MessageAndOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import storm.kafka.DynamicPartitionConnections;
-import storm.kafka.FailedFetchException;
-import storm.kafka.KafkaUtils;
-import storm.kafka.Partition;
-import storm.kafka.UpdateOffsetException;
+import storm.kafka.*;
+import storm.kafka.TopicOffsetOutOfRangeException;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IOpaquePartitionedTridentSpout;
import storm.trident.spout.IPartitionedTridentSpout;
@@ -115,7 +112,7 @@ public class TridentKafkaEmitter {
ByteBufferMessageSet msgs = null;
try {
msgs = fetchMessages(consumer, partition, offset);
- } catch (UpdateOffsetException e) {
+ } catch (TopicOffsetOutOfRangeException e) {
long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset);
offset = newOffset;
@@ -167,7 +164,7 @@ public class TridentKafkaEmitter {
ByteBufferMessageSet msgs = null;
try {
msgs = fetchMessages(consumer, partition, offset);
- } catch (UpdateOffsetException e) {
+ } catch (TopicOffsetOutOfRangeException e) {
LOG.warn("OffsetOutOfRange during reEmitPartitionBatch, the transaction can not be replayed." +
"Returning empty messages");
}
http://git-wip-us.apache.org/repos/asf/storm/blob/b2f48b41/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
index ae2fe6c..1f1bbbc 100644
--- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
@@ -99,7 +99,7 @@ public class KafkaUtilsTest {
new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), -99);
}
- @Test(expected = UpdateOffsetException.class)
+ @Test(expected = TopicOffsetOutOfRangeException.class)
public void fetchMessagesWithInvalidOffsetAndDefaultHandlingEnabled() throws Exception {
config = new KafkaConfig(brokerHosts, "newTopic");
String value = "test";