You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/01/06 17:40:49 UTC

[02/11] storm git commit: Ading special case for retry batch, in case of trident a transaction retry should not jump the offset requested as part of retry.

Ading special case for retry batch, in case of trident a transaction retry should not jump the offset requested as part of retry.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/86839dc6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/86839dc6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/86839dc6

Branch: refs/heads/master
Commit: 86839dc6b789045a13cf28cba008e52c4d835fa4
Parents: 65e9f0c
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Mon Dec 8 14:49:29 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Mon Dec 8 14:49:29 2014 -0800

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/KafkaUtils.java         |  4 +-
 .../kafka/trident/TridentKafkaEmitter.java      | 47 +++++++++++++-------
 2 files changed, 31 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/86839dc6/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 3165189..9772c0d 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -180,9 +180,7 @@ public class KafkaUtils {
         if (fetchResponse.hasError()) {
             KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
             if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
-                String msg = "Got fetch request with offset out of range: [" + offset + "]; " +
-                        "retrying with default start offset time from configuration. " +
-                        "configured start offset time: [" + config.startOffsetTime + "]";
+                String msg = "Got fetch request with offset out of range: [" + offset + "]";
                 LOG.warn(msg);
                 throw new UpdateOffsetException(msg);
             } else {

http://git-wip-us.apache.org/repos/asf/storm/blob/86839dc6/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 34566c5..90d7f75 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -111,7 +111,17 @@ public class TridentKafkaEmitter {
         } else {
             offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
         }
-        ByteBufferMessageSet msgs = fetchMessages(consumer, partition, offset);
+
+        ByteBufferMessageSet msgs = null;
+        try {
+            msgs = fetchMessages(consumer, partition, offset);
+        } catch (UpdateOffsetException e) {
+            long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
+            LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset);
+            offset = newOffset;
+            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
+        }
+
         long endoffset = offset;
         for (MessageAndOffset msg : msgs) {
             emit(collector, msg.message());
@@ -131,13 +141,7 @@ public class TridentKafkaEmitter {
     private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
         long start = System.nanoTime();
         ByteBufferMessageSet msgs = null;
-        try {
-            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
-        } catch (UpdateOffsetException e) {
-            long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
-            LOG.warn("OffsetOutOfRange, Updating offset from offset = " + offset + " to offset = " + newOffset);
-            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, newOffset);
-        }
+        msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
         long end = System.nanoTime();
         long millis = (end - start) / 1000000;
         _kafkaMeanFetchLatencyMetric.update(millis);
@@ -160,16 +164,25 @@ public class TridentKafkaEmitter {
             SimpleConsumer consumer = _connections.register(partition);
             long offset = (Long) meta.get("offset");
             long nextOffset = (Long) meta.get("nextOffset");
-            ByteBufferMessageSet msgs = fetchMessages(consumer, partition, offset);
-            for (MessageAndOffset msg : msgs) {
-                if (offset == nextOffset) {
-                    break;
-                }
-                if (offset > nextOffset) {
-                    throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
+            ByteBufferMessageSet msgs = null;
+            try {
+                msgs = fetchMessages(consumer, partition, offset);
+            } catch (UpdateOffsetException e) {
+                LOG.warn("OffsetOutOfRange during reEmitPartitionBatch, the transaction can not be replayed." +
+                        "Returning empty messages");
+            }
+
+            if(msgs != null) {
+                for (MessageAndOffset msg : msgs) {
+                    if (offset == nextOffset) {
+                        break;
+                    }
+                    if (offset > nextOffset) {
+                        throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
+                    }
+                    emit(collector, msg.message());
+                    offset = msg.nextOffset();
                 }
-                emit(collector, msg.message());
-                offset = msg.nextOffset();
             }
         }
     }