You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/02/23 23:22:00 UTC
[05/21] storm git commit: Revert "STORM-616 : removing unintended
changes."
Revert "STORM-616 : removing unintended changes."
This reverts commit d260759ac203383e27668a7cb7090926029f7406.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ca235e6c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ca235e6c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ca235e6c
Branch: refs/heads/master
Commit: ca235e6cb18006bbbac56361639309e73c196718
Parents: 079deda
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Tue Jan 6 17:43:58 2015 -0500
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Tue Jan 6 17:43:58 2015 -0500
----------------------------------------------------------------------
external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java | 7 ++++---
.../src/jvm/storm/kafka/UpdateOffsetException.java | 5 ++++-
.../src/jvm/storm/kafka/trident/TridentKafkaEmitter.java | 10 +++++++++-
3 files changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ca235e6c/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 918da74..3165189 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -180,10 +180,11 @@ public class KafkaUtils {
if (fetchResponse.hasError()) {
KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
- LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
+ String msg = "Got fetch request with offset out of range: [" + offset + "]; " +
"retrying with default start offset time from configuration. " +
- "configured start offset time: [" + config.startOffsetTime + "]");
- throw new UpdateOffsetException();
+ "configured start offset time: [" + config.startOffsetTime + "]";
+ LOG.warn(msg);
+ throw new UpdateOffsetException(msg);
} else {
String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
LOG.error(message);
http://git-wip-us.apache.org/repos/asf/storm/blob/ca235e6c/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
index 1be7312..5c366ec 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
@@ -17,6 +17,9 @@
*/
package storm.kafka;
-public class UpdateOffsetException extends RuntimeException {
+public class UpdateOffsetException extends FailedFetchException {
+ public UpdateOffsetException(String message) {
+ super(message);
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ca235e6c/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 94bf134..34566c5 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -33,6 +33,7 @@ import storm.kafka.DynamicPartitionConnections;
import storm.kafka.FailedFetchException;
import storm.kafka.KafkaUtils;
import storm.kafka.Partition;
+import storm.kafka.UpdateOffsetException;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IOpaquePartitionedTridentSpout;
import storm.trident.spout.IPartitionedTridentSpout;
@@ -129,7 +130,14 @@ public class TridentKafkaEmitter {
private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
long start = System.nanoTime();
- ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
+ ByteBufferMessageSet msgs = null;
+ try {
+ msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
+ } catch (UpdateOffsetException e) {
+ long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
+ LOG.warn("OffsetOutOfRange, Updating offset from offset = " + offset + " to offset = " + newOffset);
+ msgs = KafkaUtils.fetchMessages(_config, consumer, partition, newOffset);
+ }
long end = System.nanoTime();
long millis = (end - start) / 1000000;
_kafkaMeanFetchLatencyMetric.update(millis);