You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/01/31 09:36:15 UTC
[kafka] branch trunk updated: MINOR: ConsoleConsumer should not always exit when Consumer::poll returns an empty record batch (#11718)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a47dae4 MINOR: ConsoleConsumer should not always exit when Consumer::poll returns an empty record batch (#11718)
a47dae4 is described below
commit a47dae4622a25d7ca094c32ef36cf234a6c61cca
Author: David Jacot <dj...@confluent.io>
AuthorDate: Mon Jan 31 10:34:11 2022 +0100
MINOR: ConsoleConsumer should not always exit when Consumer::poll returns an empty record batch (#11718)
With https://github.com/apache/kafka/commit/ddb6959c6272d2039ed8c9f595634c3c9573f85e, `Consumer::poll` will return an empty record batch when position advances due to aborted transactions or control records. This makes the `ConsoleConsumer` exists because it assumes that `poll` returns due to the timeout being reached. This patch fixes this by explicitly tracking the timeout.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../main/scala/kafka/tools/ConsoleConsumer.scala | 18 +++++++++---
.../unit/kafka/tools/ConsoleConsumerTest.scala | 32 +++++++++++++++++++++-
2 files changed, 45 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 9cef54f..ecaaa30 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.requests.ListOffsetsRequest
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer}
+import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.Utils
import scala.jdk.CollectionConverters._
@@ -406,8 +407,15 @@ object ConsoleConsumer extends Logging {
}
}
- private[tools] class ConsumerWrapper(topic: Option[String], partitionId: Option[Int], offset: Option[Long], includedTopics: Option[String],
- consumer: Consumer[Array[Byte], Array[Byte]], val timeoutMs: Long = Long.MaxValue) {
+ private[tools] class ConsumerWrapper(
+ topic: Option[String],
+ partitionId: Option[Int],
+ offset: Option[Long],
+ includedTopics: Option[String],
+ consumer: Consumer[Array[Byte], Array[Byte]],
+ timeoutMs: Long = Long.MaxValue,
+ time: Time = Time.SYSTEM
+ ) {
consumerInit()
var recordIter = Collections.emptyList[ConsumerRecord[Array[Byte], Array[Byte]]]().iterator()
@@ -452,10 +460,12 @@ object ConsoleConsumer extends Logging {
}
def receive(): ConsumerRecord[Array[Byte], Array[Byte]] = {
- if (!recordIter.hasNext) {
+ val startTimeMs = time.milliseconds
+ while (!recordIter.hasNext) {
recordIter = consumer.poll(Duration.ofMillis(timeoutMs)).iterator
- if (!recordIter.hasNext)
+ if (!recordIter.hasNext && (time.milliseconds - startTimeMs > timeoutMs)) {
throw new TimeoutException()
+ }
}
recordIter.next
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 9a8b734..686346e 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -20,8 +20,9 @@ package kafka.tools
import java.io.{ByteArrayOutputStream, PrintStream}
import java.nio.file.Files
import java.util.{HashMap, Optional, Map => JMap}
+import java.time.Duration
import kafka.tools.ConsoleConsumer.ConsumerWrapper
-import kafka.utils.{Exit, TestUtils}
+import kafka.utils.{Exit, MockTime, TestUtils}
import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy}
import org.apache.kafka.common.{MessageFormatter, TopicPartition}
import org.apache.kafka.common.record.TimestampType
@@ -30,6 +31,9 @@ import org.apache.kafka.test.MockDeserializer
import org.mockito.Mockito._
import org.mockito.ArgumentMatchers
import ArgumentMatchers._
+import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerRecords
+import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.header.internals.RecordHeaders
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, Test}
@@ -44,6 +48,32 @@ class ConsoleConsumerTest {
}
@Test
+ def shouldThrowTimeoutExceptionWhenTimeoutIsReached(): Unit = {
+ val topic = "test"
+ val time = new MockTime
+ val timeoutMs = 1000
+
+ val mockConsumer = mock(classOf[Consumer[Array[Byte], Array[Byte]]])
+
+ when(mockConsumer.poll(Duration.ofMillis(timeoutMs))).thenAnswer { _ =>
+ time.sleep(timeoutMs / 2 + 1)
+ ConsumerRecords.EMPTY
+ }
+
+ val consumer = new ConsumerWrapper(
+ topic = Some(topic),
+ partitionId = None,
+ offset = None,
+ includedTopics = None,
+ consumer = mockConsumer,
+ timeoutMs = timeoutMs,
+ time = time
+ )
+
+ assertThrows(classOf[TimeoutException], () => consumer.receive())
+ }
+
+ @Test
def shouldResetUnConsumedOffsetsBeforeExit(): Unit = {
val topic = "test"
val maxMessages: Int = 123