You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/01/06 17:40:48 UTC

[01/11] storm git commit: STORM-586: TridentKafkaEmitter should catch updateOffsetException.

Repository: storm
Updated Branches:
  refs/heads/master 01729f1ff -> 36f75224e


 STORM-586: TridentKafkaEmitter should catch updateOffsetException.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/65e9f0c8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/65e9f0c8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/65e9f0c8

Branch: refs/heads/master
Commit: 65e9f0c814b2cddc772880042259b66194fd6fb7
Parents: 3bbdc16
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Fri Dec 5 14:48:34 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Fri Dec 5 14:48:34 2014 -0800

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java  |  7 ++++---
 .../src/jvm/storm/kafka/UpdateOffsetException.java        |  5 ++++-
 .../src/jvm/storm/kafka/trident/TridentKafkaEmitter.java  | 10 +++++++++-
 3 files changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/65e9f0c8/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 918da74..3165189 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -180,10 +180,11 @@ public class KafkaUtils {
         if (fetchResponse.hasError()) {
             KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
             if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
-                LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
+                String msg = "Got fetch request with offset out of range: [" + offset + "]; " +
                         "retrying with default start offset time from configuration. " +
-                        "configured start offset time: [" + config.startOffsetTime + "]");
-                throw new UpdateOffsetException();
+                        "configured start offset time: [" + config.startOffsetTime + "]";
+                LOG.warn(msg);
+                throw new UpdateOffsetException(msg);
             } else {
                 String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
                 LOG.error(message);

http://git-wip-us.apache.org/repos/asf/storm/blob/65e9f0c8/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
index 1be7312..5c366ec 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
@@ -17,6 +17,9 @@
  */
 package storm.kafka;
 
-public class UpdateOffsetException extends RuntimeException {
+public class UpdateOffsetException extends FailedFetchException {
 
+    public UpdateOffsetException(String message) {
+        super(message);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/65e9f0c8/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 94bf134..34566c5 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -33,6 +33,7 @@ import storm.kafka.DynamicPartitionConnections;
 import storm.kafka.FailedFetchException;
 import storm.kafka.KafkaUtils;
 import storm.kafka.Partition;
+import storm.kafka.UpdateOffsetException;
 import storm.trident.operation.TridentCollector;
 import storm.trident.spout.IOpaquePartitionedTridentSpout;
 import storm.trident.spout.IPartitionedTridentSpout;
@@ -129,7 +130,14 @@ public class TridentKafkaEmitter {
 
     private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
         long start = System.nanoTime();
-        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
+        ByteBufferMessageSet msgs = null;
+        try {
+            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
+        } catch (UpdateOffsetException e) {
+            long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
+            LOG.warn("OffsetOutOfRange, Updating offset from offset = " + offset + " to offset = " + newOffset);
+            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, newOffset);
+        }
         long end = System.nanoTime();
         long millis = (end - start) / 1000000;
         _kafkaMeanFetchLatencyMetric.update(millis);


[05/11] storm git commit: added all exceptions in throws declaration.

Posted by sr...@apache.org.
added all exceptions in throws declaration.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/63c86367
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/63c86367
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/63c86367

Branch: refs/heads/master
Commit: 63c86367b168c1e1118d498d9e5f7b2c8df033ed
Parents: fcf3135
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Dec 17 19:19:24 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Dec 17 19:19:24 2014 -0800

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/63c86367/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index b858639..706ea0e 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -155,7 +155,8 @@ public class KafkaUtils {
         }
     }
 
-    public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) throws TopicOffsetOutOfRangeException {
+    public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset)
+            throws TopicOffsetOutOfRangeException, FailedFetchException,RuntimeException {
         ByteBufferMessageSet msgs = null;
         String topic = config.topic;
         int partitionId = partition.partition;


[07/11] storm git commit: Forcing the startTime = earliestTime when the offset is outOfRange.

Posted by sr...@apache.org.
Forcing the startTime = earliestTime when the offset is outOfRange.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a7c8f25f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a7c8f25f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a7c8f25f

Branch: refs/heads/master
Commit: a7c8f25f3032db7658ea12f00acfdea3dfd383fc
Parents: cb548dd
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Thu Dec 18 10:11:07 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Thu Dec 18 10:11:17 2014 -0800

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java     | 2 +-
 .../src/jvm/storm/kafka/trident/TridentKafkaEmitter.java           | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a7c8f25f/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 3f9e410..54a61f4 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -159,7 +159,7 @@ public class PartitionManager {
         try {
             msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
         } catch (TopicOffsetOutOfRangeException e) {
-            _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig);
+            _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
             LOG.warn("Using new offset: {}", _emittedToOffset);
             // fetch failed, so don't update the metrics
             return;

http://git-wip-us.apache.org/repos/asf/storm/blob/a7c8f25f/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 727ff8d..db4299a 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -113,7 +113,7 @@ public class TridentKafkaEmitter {
         try {
             msgs = fetchMessages(consumer, partition, offset);
         } catch (TopicOffsetOutOfRangeException e) {
-            long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
+            long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
             LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset);
             offset = newOffset;
             msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);


[09/11] storm git commit: Merge branch 'STORM-586' of https://github.com/Parth-Brahmbhatt/incubator-storm into STORM-586-V2

Posted by sr...@apache.org.
Merge branch 'STORM-586' of https://github.com/Parth-Brahmbhatt/incubator-storm into STORM-586-V2


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ebc9bc5a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ebc9bc5a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ebc9bc5a

Branch: refs/heads/master
Commit: ebc9bc5ae3c74e13f06dcfe137125d1e22ed2d47
Parents: ab76e67 dc764b1
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Jan 6 06:40:07 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Jan 6 06:40:07 2015 -0800

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/KafkaUtils.java         | 10 ++---
 .../src/jvm/storm/kafka/PartitionManager.java   |  6 +--
 .../kafka/TopicOffsetOutOfRangeException.java   | 25 ++++++++++++
 .../jvm/storm/kafka/UpdateOffsetException.java  | 22 ----------
 .../kafka/trident/TridentKafkaEmitter.java      | 43 +++++++++++++-------
 .../src/test/storm/kafka/KafkaUtilsTest.java    |  2 +-
 6 files changed, 61 insertions(+), 47 deletions(-)
----------------------------------------------------------------------



[10/11] storm git commit: added STORM-586 to changelog.

Posted by sr...@apache.org.
added STORM-586 to changelog.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ee4bd378
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ee4bd378
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ee4bd378

Branch: refs/heads/master
Commit: ee4bd3786cb03785d956a3b8992220b309365c8c
Parents: ebc9bc5
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Jan 6 08:25:47 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Jan 6 08:25:47 2015 -0800

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ee4bd378/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8ccaece..a95e8d5 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.10.0
+ * STORM-586: Trident kafka spout fails instead of updating offset when kafka offset is out of range.
  * STORM-595: storm-hdfs can only work with sequence files that use Writables.
  * STORM-487: Remove storm.cmd, no need to duplicate work python runs on windows too.
  * STORM-585: Performance issue in none grouping


[02/11] storm git commit: Ading special case for retry batch, in case of trident a transaction retry should not jump the offset requested as part of retry.

Posted by sr...@apache.org.
Ading special case for retry batch, in case of trident a transaction retry should not jump the offset requested as part of retry.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/86839dc6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/86839dc6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/86839dc6

Branch: refs/heads/master
Commit: 86839dc6b789045a13cf28cba008e52c4d835fa4
Parents: 65e9f0c
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Mon Dec 8 14:49:29 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Mon Dec 8 14:49:29 2014 -0800

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/KafkaUtils.java         |  4 +-
 .../kafka/trident/TridentKafkaEmitter.java      | 47 +++++++++++++-------
 2 files changed, 31 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/86839dc6/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 3165189..9772c0d 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -180,9 +180,7 @@ public class KafkaUtils {
         if (fetchResponse.hasError()) {
             KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
             if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
-                String msg = "Got fetch request with offset out of range: [" + offset + "]; " +
-                        "retrying with default start offset time from configuration. " +
-                        "configured start offset time: [" + config.startOffsetTime + "]";
+                String msg = "Got fetch request with offset out of range: [" + offset + "]";
                 LOG.warn(msg);
                 throw new UpdateOffsetException(msg);
             } else {

http://git-wip-us.apache.org/repos/asf/storm/blob/86839dc6/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 34566c5..90d7f75 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -111,7 +111,17 @@ public class TridentKafkaEmitter {
         } else {
             offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
         }
-        ByteBufferMessageSet msgs = fetchMessages(consumer, partition, offset);
+
+        ByteBufferMessageSet msgs = null;
+        try {
+            msgs = fetchMessages(consumer, partition, offset);
+        } catch (UpdateOffsetException e) {
+            long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
+            LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset);
+            offset = newOffset;
+            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
+        }
+
         long endoffset = offset;
         for (MessageAndOffset msg : msgs) {
             emit(collector, msg.message());
@@ -131,13 +141,7 @@ public class TridentKafkaEmitter {
     private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
         long start = System.nanoTime();
         ByteBufferMessageSet msgs = null;
-        try {
-            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
-        } catch (UpdateOffsetException e) {
-            long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
-            LOG.warn("OffsetOutOfRange, Updating offset from offset = " + offset + " to offset = " + newOffset);
-            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, newOffset);
-        }
+        msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
         long end = System.nanoTime();
         long millis = (end - start) / 1000000;
         _kafkaMeanFetchLatencyMetric.update(millis);
@@ -160,16 +164,25 @@ public class TridentKafkaEmitter {
             SimpleConsumer consumer = _connections.register(partition);
             long offset = (Long) meta.get("offset");
             long nextOffset = (Long) meta.get("nextOffset");
-            ByteBufferMessageSet msgs = fetchMessages(consumer, partition, offset);
-            for (MessageAndOffset msg : msgs) {
-                if (offset == nextOffset) {
-                    break;
-                }
-                if (offset > nextOffset) {
-                    throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
+            ByteBufferMessageSet msgs = null;
+            try {
+                msgs = fetchMessages(consumer, partition, offset);
+            } catch (UpdateOffsetException e) {
+                LOG.warn("OffsetOutOfRange during reEmitPartitionBatch, the transaction can not be replayed." +
+                        "Returning empty messages");
+            }
+
+            if(msgs != null) {
+                for (MessageAndOffset msg : msgs) {
+                    if (offset == nextOffset) {
+                        break;
+                    }
+                    if (offset > nextOffset) {
+                        throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
+                    }
+                    emit(collector, msg.message());
+                    offset = msg.nextOffset();
                 }
-                emit(collector, msg.message());
-                offset = msg.nextOffset();
             }
         }
     }


[06/11] storm git commit: Merge remote-tracking branch 'upstream/master' into STORM-586

Posted by sr...@apache.org.
Merge remote-tracking branch 'upstream/master' into STORM-586


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cb548dd1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cb548dd1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cb548dd1

Branch: refs/heads/master
Commit: cb548dd13b4b90d64f41ce85e8268f7bab153d56
Parents: 63c8636 f76b3e7
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Dec 17 19:20:00 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Dec 17 19:20:00 2014 -0800

----------------------------------------------------------------------
 CHANGELOG.md                                           | 3 +++
 external/storm-kafka/README.md                         | 2 +-
 logback/cluster.xml                                    | 2 +-
 storm-core/src/clj/backtype/storm/daemon/executor.clj  | 2 +-
 storm-core/src/clj/backtype/storm/messaging/loader.clj | 3 ++-
 storm-core/src/ui/public/component.html                | 2 +-
 6 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------



[03/11] storm git commit: Renaming UpdateOffsetException to TopicOffsetOutOfRangeException.

Posted by sr...@apache.org.
Renaming UpdateOffsetException to TopicOffsetOutOfRangeException.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b2f48b41
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b2f48b41
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b2f48b41

Branch: refs/heads/master
Commit: b2f48b41f19398498c7ae41c2059f3685b87ac22
Parents: 86839dc
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Mon Dec 8 15:06:26 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Mon Dec 8 15:06:26 2014 -0800

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/KafkaUtils.java         |  4 ++--
 .../src/jvm/storm/kafka/PartitionManager.java   |  4 +---
 .../kafka/TopicOffsetOutOfRangeException.java   | 25 ++++++++++++++++++++
 .../jvm/storm/kafka/UpdateOffsetException.java  | 25 --------------------
 .../kafka/trident/TridentKafkaEmitter.java      | 11 ++++-----
 .../src/test/storm/kafka/KafkaUtilsTest.java    |  2 +-
 6 files changed, 33 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b2f48b41/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 9772c0d..b858639 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -155,7 +155,7 @@ public class KafkaUtils {
         }
     }
 
-    public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) throws UpdateOffsetException {
+    public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) throws TopicOffsetOutOfRangeException {
         ByteBufferMessageSet msgs = null;
         String topic = config.topic;
         int partitionId = partition.partition;
@@ -182,7 +182,7 @@ public class KafkaUtils {
             if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
                 String msg = "Got fetch request with offset out of range: [" + offset + "]";
                 LOG.warn(msg);
-                throw new UpdateOffsetException(msg);
+                throw new TopicOffsetOutOfRangeException(msg);
             } else {
                 String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
                 LOG.error(message);

http://git-wip-us.apache.org/repos/asf/storm/blob/b2f48b41/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index d24a49e..3f9e410 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -23,11 +23,9 @@ import backtype.storm.metric.api.CountMetric;
 import backtype.storm.metric.api.MeanReducer;
 import backtype.storm.metric.api.ReducedMetric;
 import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.utils.Utils;
 import com.google.common.collect.ImmutableMap;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.ByteBufferMessageSet$;
 import kafka.message.MessageAndOffset;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -160,7 +158,7 @@ public class PartitionManager {
         ByteBufferMessageSet msgs = null;
         try {
             msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
-        } catch (UpdateOffsetException e) {
+        } catch (TopicOffsetOutOfRangeException e) {
             _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig);
             LOG.warn("Using new offset: {}", _emittedToOffset);
             // fetch failed, so don't update the metrics

http://git-wip-us.apache.org/repos/asf/storm/blob/b2f48b41/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java b/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java
new file mode 100644
index 0000000..2e6d2f5
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+public class TopicOffsetOutOfRangeException extends FailedFetchException {
+
+    public TopicOffsetOutOfRangeException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/b2f48b41/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
deleted file mode 100644
index 5c366ec..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-public class UpdateOffsetException extends FailedFetchException {
-
-    public UpdateOffsetException(String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/b2f48b41/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 90d7f75..727ff8d 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -29,11 +29,8 @@ import kafka.message.Message;
 import kafka.message.MessageAndOffset;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import storm.kafka.DynamicPartitionConnections;
-import storm.kafka.FailedFetchException;
-import storm.kafka.KafkaUtils;
-import storm.kafka.Partition;
-import storm.kafka.UpdateOffsetException;
+import storm.kafka.*;
+import storm.kafka.TopicOffsetOutOfRangeException;
 import storm.trident.operation.TridentCollector;
 import storm.trident.spout.IOpaquePartitionedTridentSpout;
 import storm.trident.spout.IPartitionedTridentSpout;
@@ -115,7 +112,7 @@ public class TridentKafkaEmitter {
         ByteBufferMessageSet msgs = null;
         try {
             msgs = fetchMessages(consumer, partition, offset);
-        } catch (UpdateOffsetException e) {
+        } catch (TopicOffsetOutOfRangeException e) {
             long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
             LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset);
             offset = newOffset;
@@ -167,7 +164,7 @@ public class TridentKafkaEmitter {
             ByteBufferMessageSet msgs = null;
             try {
                 msgs = fetchMessages(consumer, partition, offset);
-            } catch (UpdateOffsetException e) {
+            } catch (TopicOffsetOutOfRangeException e) {
                 LOG.warn("OffsetOutOfRange during reEmitPartitionBatch, the transaction can not be replayed." +
                         "Returning empty messages");
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/b2f48b41/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
index ae2fe6c..1f1bbbc 100644
--- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
@@ -99,7 +99,7 @@ public class KafkaUtilsTest {
                 new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), -99);
     }
 
-    @Test(expected = UpdateOffsetException.class)
+    @Test(expected = TopicOffsetOutOfRangeException.class)
     public void fetchMessagesWithInvalidOffsetAndDefaultHandlingEnabled() throws Exception {
         config = new KafkaConfig(brokerHosts, "newTopic");
         String value = "test";


[11/11] storm git commit: Merge branch 'STORM-586-V2'

Posted by sr...@apache.org.
Merge branch 'STORM-586-V2'


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/36f75224
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/36f75224
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/36f75224

Branch: refs/heads/master
Commit: 36f75224e6aeb196caac0b43123b12014ac118ff
Parents: 01729f1 ee4bd37
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Tue Jan 6 08:35:56 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Tue Jan 6 08:35:56 2015 -0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |  1 +
 .../src/jvm/storm/kafka/KafkaUtils.java         | 10 ++---
 .../src/jvm/storm/kafka/PartitionManager.java   |  6 +--
 .../kafka/TopicOffsetOutOfRangeException.java   | 25 ++++++++++++
 .../jvm/storm/kafka/UpdateOffsetException.java  | 22 ----------
 .../kafka/trident/TridentKafkaEmitter.java      | 43 +++++++++++++-------
 .../src/test/storm/kafka/KafkaUtilsTest.java    |  2 +-
 7 files changed, 62 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/36f75224/CHANGELOG.md
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/36f75224/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------


[08/11] storm git commit: Transactional spout should fail on TopicOutOfRangeException while retrying.

Posted by sr...@apache.org.
Transactional spout should fail on TopicOutOfRangeException while retrying.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dc764b10
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dc764b10
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dc764b10

Branch: refs/heads/master
Commit: dc764b10d4862bb2ea123c23289d54600d7b4ffc
Parents: a7c8f25
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Thu Dec 18 16:45:01 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Thu Dec 18 16:45:01 2014 -0800

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/trident/TridentKafkaEmitter.java      | 7 +------
 1 file changed, 1 insertion(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dc764b10/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index db4299a..1a9be43 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -162,12 +162,7 @@ public class TridentKafkaEmitter {
             long offset = (Long) meta.get("offset");
             long nextOffset = (Long) meta.get("nextOffset");
             ByteBufferMessageSet msgs = null;
-            try {
-                msgs = fetchMessages(consumer, partition, offset);
-            } catch (TopicOffsetOutOfRangeException e) {
-                LOG.warn("OffsetOutOfRange during reEmitPartitionBatch, the transaction can not be replayed." +
-                        "Returning empty messages");
-            }
+            msgs = fetchMessages(consumer, partition, offset);
 
             if(msgs != null) {
                 for (MessageAndOffset msg : msgs) {


[04/11] storm git commit: Reverting back to TopicOffsetOutOfRangeException extends RunTimeException.

Posted by sr...@apache.org.
Reverting back to TopicOffsetOutOfRangeException extends RunTimeException.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fcf31350
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fcf31350
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fcf31350

Branch: refs/heads/master
Commit: fcf31350b62ca0efeeea96c8e1b0134edb81c1eb
Parents: b2f48b4
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Mon Dec 8 15:10:13 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Mon Dec 8 15:10:13 2014 -0800

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java        | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fcf31350/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java b/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java
index 2e6d2f5..5101a3e 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/TopicOffsetOutOfRangeException.java
@@ -17,7 +17,7 @@
  */
 package storm.kafka;
 
-public class TopicOffsetOutOfRangeException extends FailedFetchException {
+public class TopicOffsetOutOfRangeException extends RuntimeException {
 
     public TopicOffsetOutOfRangeException(String message) {
         super(message);