You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/02/23 23:21:58 UTC
[03/21] storm git commit: STORM-616 : removing unintended changes.
STORM-616 : removing unintended changes.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d260759a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d260759a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d260759a
Branch: refs/heads/master
Commit: d260759ac203383e27668a7cb7090926029f7406
Parents: ab9f778
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Mon Jan 5 22:31:05 2015 -0500
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Mon Jan 5 22:31:05 2015 -0500
----------------------------------------------------------------------
external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java | 7 +++----
.../src/jvm/storm/kafka/UpdateOffsetException.java | 5 +----
.../src/jvm/storm/kafka/trident/TridentKafkaEmitter.java | 10 +---------
3 files changed, 5 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d260759a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 3165189..918da74 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -180,11 +180,10 @@ public class KafkaUtils {
if (fetchResponse.hasError()) {
KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
- String msg = "Got fetch request with offset out of range: [" + offset + "]; " +
+ LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
"retrying with default start offset time from configuration. " +
- "configured start offset time: [" + config.startOffsetTime + "]";
- LOG.warn(msg);
- throw new UpdateOffsetException(msg);
+ "configured start offset time: [" + config.startOffsetTime + "]");
+ throw new UpdateOffsetException();
} else {
String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
LOG.error(message);
http://git-wip-us.apache.org/repos/asf/storm/blob/d260759a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
index 5c366ec..1be7312 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
@@ -17,9 +17,6 @@
*/
package storm.kafka;
-public class UpdateOffsetException extends FailedFetchException {
+public class UpdateOffsetException extends RuntimeException {
- public UpdateOffsetException(String message) {
- super(message);
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d260759a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 34566c5..94bf134 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -33,7 +33,6 @@ import storm.kafka.DynamicPartitionConnections;
import storm.kafka.FailedFetchException;
import storm.kafka.KafkaUtils;
import storm.kafka.Partition;
-import storm.kafka.UpdateOffsetException;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IOpaquePartitionedTridentSpout;
import storm.trident.spout.IPartitionedTridentSpout;
@@ -130,14 +129,7 @@ public class TridentKafkaEmitter {
private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
long start = System.nanoTime();
- ByteBufferMessageSet msgs = null;
- try {
- msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
- } catch (UpdateOffsetException e) {
- long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
- LOG.warn("OffsetOutOfRange, Updating offset from offset = " + offset + " to offset = " + newOffset);
- msgs = KafkaUtils.fetchMessages(_config, consumer, partition, newOffset);
- }
+ ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
long end = System.nanoTime();
long millis = (end - start) / 1000000;
_kafkaMeanFetchLatencyMetric.update(millis);