You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/01/31 09:36:15 UTC

[kafka] branch trunk updated: MINOR: ConsoleConsumer should not always exit when Consumer::poll returns an empty record batch (#11718)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a47dae4  MINOR: ConsoleConsumer should not always exit when Consumer::poll returns an empty record batch (#11718)
a47dae4 is described below

commit a47dae4622a25d7ca094c32ef36cf234a6c61cca
Author: David Jacot <dj...@confluent.io>
AuthorDate: Mon Jan 31 10:34:11 2022 +0100

    MINOR: ConsoleConsumer should not always exit when Consumer::poll returns an empty record batch (#11718)
    
    With https://github.com/apache/kafka/commit/ddb6959c6272d2039ed8c9f595634c3c9573f85e, `Consumer::poll` will return an empty record batch when position advances due to aborted transactions or control records. This makes the `ConsoleConsumer` exists because it assumes that `poll` returns due to the timeout being reached. This patch fixes this by explicitly tracking the timeout.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../main/scala/kafka/tools/ConsoleConsumer.scala   | 18 +++++++++---
 .../unit/kafka/tools/ConsoleConsumerTest.scala     | 32 +++++++++++++++++++++-
 2 files changed, 45 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 9cef54f..ecaaa30 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.requests.ListOffsetsRequest
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer}
+import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.utils.Utils
 
 import scala.jdk.CollectionConverters._
@@ -406,8 +407,15 @@ object ConsoleConsumer extends Logging {
     }
   }
 
-  private[tools] class ConsumerWrapper(topic: Option[String], partitionId: Option[Int], offset: Option[Long], includedTopics: Option[String],
-                                       consumer: Consumer[Array[Byte], Array[Byte]], val timeoutMs: Long = Long.MaxValue) {
+  private[tools] class ConsumerWrapper(
+    topic: Option[String],
+    partitionId: Option[Int],
+    offset: Option[Long],
+    includedTopics: Option[String],
+    consumer: Consumer[Array[Byte], Array[Byte]],
+    timeoutMs: Long = Long.MaxValue,
+    time: Time = Time.SYSTEM
+  ) {
     consumerInit()
     var recordIter = Collections.emptyList[ConsumerRecord[Array[Byte], Array[Byte]]]().iterator()
 
@@ -452,10 +460,12 @@ object ConsoleConsumer extends Logging {
     }
 
     def receive(): ConsumerRecord[Array[Byte], Array[Byte]] = {
-      if (!recordIter.hasNext) {
+      val startTimeMs = time.milliseconds
+      while (!recordIter.hasNext) {
         recordIter = consumer.poll(Duration.ofMillis(timeoutMs)).iterator
-        if (!recordIter.hasNext)
+        if (!recordIter.hasNext && (time.milliseconds - startTimeMs > timeoutMs)) {
           throw new TimeoutException()
+        }
       }
 
       recordIter.next
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 9a8b734..686346e 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -20,8 +20,9 @@ package kafka.tools
 import java.io.{ByteArrayOutputStream, PrintStream}
 import java.nio.file.Files
 import java.util.{HashMap, Optional, Map => JMap}
+import java.time.Duration
 import kafka.tools.ConsoleConsumer.ConsumerWrapper
-import kafka.utils.{Exit, TestUtils}
+import kafka.utils.{Exit, MockTime, TestUtils}
 import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy}
 import org.apache.kafka.common.{MessageFormatter, TopicPartition}
 import org.apache.kafka.common.record.TimestampType
@@ -30,6 +31,9 @@ import org.apache.kafka.test.MockDeserializer
 import org.mockito.Mockito._
 import org.mockito.ArgumentMatchers
 import ArgumentMatchers._
+import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerRecords
+import org.apache.kafka.common.errors.TimeoutException
 import org.apache.kafka.common.header.internals.RecordHeaders
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{BeforeEach, Test}
@@ -44,6 +48,32 @@ class ConsoleConsumerTest {
   }
 
   @Test
+  def shouldThrowTimeoutExceptionWhenTimeoutIsReached(): Unit = {
+    val topic = "test"
+    val time = new MockTime
+    val timeoutMs = 1000
+
+    val mockConsumer = mock(classOf[Consumer[Array[Byte], Array[Byte]]])
+
+    when(mockConsumer.poll(Duration.ofMillis(timeoutMs))).thenAnswer { _ =>
+      time.sleep(timeoutMs / 2 + 1)
+      ConsumerRecords.EMPTY
+    }
+
+    val consumer = new ConsumerWrapper(
+      topic = Some(topic),
+      partitionId = None,
+      offset = None,
+      includedTopics = None,
+      consumer = mockConsumer,
+      timeoutMs = timeoutMs,
+      time = time
+    )
+
+    assertThrows(classOf[TimeoutException], () => consumer.receive())
+  }
+
+  @Test
   def shouldResetUnConsumedOffsetsBeforeExit(): Unit = {
     val topic = "test"
     val maxMessages: Int = 123