You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/06/09 01:38:40 UTC

kafka git commit: KAFKA-5414; Revert KAFKA-5327 which changed ConsoleConsumer offset commit behavior

Repository: kafka
Updated Branches:
  refs/heads/trunk 88aec3eb6 -> 5d7c8cc81


KAFKA-5414; Revert KAFKA-5327 which changed ConsoleConsumer offset commit behavior

This reverts commit d7d1196a0b542adb46d22eeb5b6d12af950b64c9.

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3277 from hachikuji/KAFKA-5414


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5d7c8cc8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5d7c8cc8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5d7c8cc8

Branch: refs/heads/trunk
Commit: 5d7c8cc81af6cd2f209f8e5f3df784b9272830b9
Parents: 88aec3e
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Jun 8 18:35:07 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Jun 8 18:35:12 2017 -0700

----------------------------------------------------------------------
 .../scala/kafka/consumer/BaseConsumer.scala     | 46 ++++++--------------
 .../scala/kafka/tools/ConsoleConsumer.scala     |  3 --
 2 files changed, 13 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5d7c8cc8/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index fd5aa41..cec74d0 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -17,22 +17,18 @@
 
 package kafka.consumer
 
-import java.util
 import java.util.{Collections, Properties}
 import java.util.regex.Pattern
 
 import kafka.api.OffsetRequest
 import kafka.common.StreamEndException
 import kafka.message.Message
-import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetAndMetadata}
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.header.Headers
 import org.apache.kafka.common.header.internals.RecordHeaders
 
-import scala.collection.mutable.HashMap
-
 /**
  * A base consumer used to abstract both old and new consumer
  * this class should be removed (along with BaseProducer)
@@ -64,13 +60,8 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset:
   import org.apache.kafka.clients.consumer.KafkaConsumer
 
   val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
-  val offsets = new HashMap[TopicPartition, Long]()
-
   consumerInit()
-  private var currentPartition: TopicPartition = null
-  private var polledRecords = consumer.poll(0)
-  private var partitionIter = polledRecords.partitions.iterator
-  private var recordIter: util.Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = null
+  var recordIter = consumer.poll(0).iterator
 
   def consumerInit() {
     (topic, partitionId, offset, whitelist) match {
@@ -102,30 +93,21 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset:
   }
 
   override def receive(): BaseConsumerRecord = {
-    if (recordIter == null || !recordIter.hasNext) {
-      if (!partitionIter.hasNext) {
-        polledRecords = consumer.poll(timeoutMs)
-        partitionIter = polledRecords.partitions.iterator
-
-        if (!partitionIter.hasNext)
-          throw new ConsumerTimeoutException
-      }
-
-      currentPartition = partitionIter.next
-      recordIter = polledRecords.records(currentPartition).iterator
+    if (!recordIter.hasNext) {
+      recordIter = consumer.poll(timeoutMs).iterator
+      if (!recordIter.hasNext)
+        throw new ConsumerTimeoutException
     }
 
     val record = recordIter.next
-    offsets.put(currentPartition, record.offset + 1)
-
     BaseConsumerRecord(record.topic,
-      record.partition,
-      record.offset,
-      record.timestamp,
-      record.timestampType,
-      record.key,
-      record.value,
-      record.headers)
+                       record.partition,
+                       record.offset,
+                       record.timestamp,
+                       record.timestampType,
+                       record.key,
+                       record.value,
+                       record.headers)
   }
 
   override def stop() {
@@ -137,9 +119,7 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset:
   }
 
   override def commit() {
-    import scala.collection.JavaConverters._
-    consumer.commitSync(offsets.map { case (tp, offset) =>  (tp, new OffsetAndMetadata(offset))}.asJava)
-    offsets.clear()
+    this.consumer.commitSync()
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5d7c8cc8/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index a1e2ffa..335c724 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -77,7 +77,6 @@ object ConsoleConsumer extends Logging {
     try {
       process(conf.maxMessages, conf.formatter, consumer, System.out, conf.skipMessageOnError)
     } finally {
-      consumer.commit()
       consumer.cleanup()
       conf.formatter.close()
       reportRecordCount()
@@ -201,9 +200,7 @@ object ConsoleConsumer extends Logging {
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
-    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
     props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.isolationLevel)
-
     props
   }