You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/01/06 17:40:49 UTC
[02/11] storm git commit: Ading special case for retry batch,
in case of trident a transaction retry should not jump the offset
requested as part of retry.
Ading special case for retry batch, in case of trident a transaction retry should not jump the offset requested as part of retry.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/86839dc6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/86839dc6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/86839dc6
Branch: refs/heads/master
Commit: 86839dc6b789045a13cf28cba008e52c4d835fa4
Parents: 65e9f0c
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Mon Dec 8 14:49:29 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Mon Dec 8 14:49:29 2014 -0800
----------------------------------------------------------------------
.../src/jvm/storm/kafka/KafkaUtils.java | 4 +-
.../kafka/trident/TridentKafkaEmitter.java | 47 +++++++++++++-------
2 files changed, 31 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/86839dc6/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 3165189..9772c0d 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -180,9 +180,7 @@ public class KafkaUtils {
if (fetchResponse.hasError()) {
KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
- String msg = "Got fetch request with offset out of range: [" + offset + "]; " +
- "retrying with default start offset time from configuration. " +
- "configured start offset time: [" + config.startOffsetTime + "]";
+ String msg = "Got fetch request with offset out of range: [" + offset + "]";
LOG.warn(msg);
throw new UpdateOffsetException(msg);
} else {
http://git-wip-us.apache.org/repos/asf/storm/blob/86839dc6/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 34566c5..90d7f75 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -111,7 +111,17 @@ public class TridentKafkaEmitter {
} else {
offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
}
- ByteBufferMessageSet msgs = fetchMessages(consumer, partition, offset);
+
+ ByteBufferMessageSet msgs = null;
+ try {
+ msgs = fetchMessages(consumer, partition, offset);
+ } catch (UpdateOffsetException e) {
+ long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
+ LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset);
+ offset = newOffset;
+ msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
+ }
+
long endoffset = offset;
for (MessageAndOffset msg : msgs) {
emit(collector, msg.message());
@@ -131,13 +141,7 @@ public class TridentKafkaEmitter {
private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
long start = System.nanoTime();
ByteBufferMessageSet msgs = null;
- try {
- msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
- } catch (UpdateOffsetException e) {
- long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
- LOG.warn("OffsetOutOfRange, Updating offset from offset = " + offset + " to offset = " + newOffset);
- msgs = KafkaUtils.fetchMessages(_config, consumer, partition, newOffset);
- }
+ msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
long end = System.nanoTime();
long millis = (end - start) / 1000000;
_kafkaMeanFetchLatencyMetric.update(millis);
@@ -160,16 +164,25 @@ public class TridentKafkaEmitter {
SimpleConsumer consumer = _connections.register(partition);
long offset = (Long) meta.get("offset");
long nextOffset = (Long) meta.get("nextOffset");
- ByteBufferMessageSet msgs = fetchMessages(consumer, partition, offset);
- for (MessageAndOffset msg : msgs) {
- if (offset == nextOffset) {
- break;
- }
- if (offset > nextOffset) {
- throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
+ ByteBufferMessageSet msgs = null;
+ try {
+ msgs = fetchMessages(consumer, partition, offset);
+ } catch (UpdateOffsetException e) {
+ LOG.warn("OffsetOutOfRange during reEmitPartitionBatch, the transaction can not be replayed." +
+ "Returning empty messages");
+ }
+
+ if(msgs != null) {
+ for (MessageAndOffset msg : msgs) {
+ if (offset == nextOffset) {
+ break;
+ }
+ if (offset > nextOffset) {
+ throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
+ }
+ emit(collector, msg.message());
+ offset = msg.nextOffset();
}
- emit(collector, msg.message());
- offset = msg.nextOffset();
}
}
}