You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/11/13 06:23:31 UTC

kafka git commit: KAFKA-2819; catch NoSuchElementException in ConsoleConsumer

Repository: kafka
Updated Branches:
  refs/heads/trunk 969d0cb0a -> 4170847f1


KAFKA-2819; catch NoSuchElementException in ConsoleConsumer

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #516 from guozhangwang/K2819


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4170847f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4170847f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4170847f

Branch: refs/heads/trunk
Commit: 4170847f1227b73a20645af1b5b2d88375e37426
Parents: 969d0cb
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Nov 12 21:23:28 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Nov 12 21:23:28 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/common/StreamEndException.scala | 7 +++++++
 core/src/main/scala/kafka/consumer/BaseConsumer.scala     | 5 ++++-
 core/src/main/scala/kafka/tools/ConsoleConsumer.scala     | 8 ++++++--
 3 files changed, 17 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4170847f/core/src/main/scala/kafka/common/StreamEndException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/StreamEndException.scala b/core/src/main/scala/kafka/common/StreamEndException.scala
new file mode 100644
index 0000000..2d814f7
--- /dev/null
+++ b/core/src/main/scala/kafka/common/StreamEndException.scala
@@ -0,0 +1,7 @@
+package kafka.common
+
+/**
+ * An exception that indicates KafkaStream has ended.
+ */
+class StreamEndException() extends RuntimeException {
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4170847f/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index ced4391..7942f57 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -20,6 +20,7 @@ package kafka.consumer
 import java.util.Properties
 import java.util.regex.Pattern
 
+import kafka.common.StreamEndException
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 
 /**
@@ -84,7 +85,9 @@ class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends B
   val iter = stream.iterator
 
   override def receive(): BaseConsumerRecord = {
-    // we do not need to check hasNext for KafkaStream iterator
+    if (!iter.hasNext())
+      throw new StreamEndException
+
     val messageAndMetadata = iter.next
     BaseConsumerRecord(messageAndMetadata.topic, messageAndMetadata.partition, messageAndMetadata.offset, messageAndMetadata.key, messageAndMetadata.message)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4170847f/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 0dedcd9..24f2ac1 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -20,6 +20,7 @@ package kafka.tools
 import java.io.PrintStream
 import java.util.{Properties, Random}
 import joptsimple._
+import kafka.common.StreamEndException
 import kafka.consumer._
 import kafka.message._
 import kafka.metrics.KafkaMetricsReporter
@@ -100,11 +101,14 @@ object ConsoleConsumer extends Logging {
       val msg: BaseConsumerRecord = try {
         consumer.receive()
       } catch {
-        case e: Throwable => {
+        case nse: StreamEndException =>
+          trace("Caught StreamEndException because consumer is shutdown, ignore and terminate.")
+          // Consumer is already closed
+          return
+        case e: Throwable =>
           error("Error processing message, terminating consumer process: ", e)
           // Consumer will be closed
           return
-        }
       }
       try {
         formatter.writeTo(msg.key, msg.value, System.out)