You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/06/09 01:38:40 UTC
kafka git commit: KAFKA-5414;
Revert KAFKA-5327 which changed ConsoleConsumer offset commit behavior
Repository: kafka
Updated Branches:
refs/heads/trunk 88aec3eb6 -> 5d7c8cc81
KAFKA-5414; Revert KAFKA-5327 which changed ConsoleConsumer offset commit behavior
This reverts commit d7d1196a0b542adb46d22eeb5b6d12af950b64c9.
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3277 from hachikuji/KAFKA-5414
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5d7c8cc8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5d7c8cc8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5d7c8cc8
Branch: refs/heads/trunk
Commit: 5d7c8cc81af6cd2f209f8e5f3df784b9272830b9
Parents: 88aec3e
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Jun 8 18:35:07 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Jun 8 18:35:12 2017 -0700
----------------------------------------------------------------------
.../scala/kafka/consumer/BaseConsumer.scala | 46 ++++++--------------
.../scala/kafka/tools/ConsoleConsumer.scala | 3 --
2 files changed, 13 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5d7c8cc8/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index fd5aa41..cec74d0 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -17,22 +17,18 @@
package kafka.consumer
-import java.util
import java.util.{Collections, Properties}
import java.util.regex.Pattern
import kafka.api.OffsetRequest
import kafka.common.StreamEndException
import kafka.message.Message
-import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetAndMetadata}
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.header.internals.RecordHeaders
-import scala.collection.mutable.HashMap
-
/**
* A base consumer used to abstract both old and new consumer
* this class should be removed (along with BaseProducer)
@@ -64,13 +60,8 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset:
import org.apache.kafka.clients.consumer.KafkaConsumer
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
- val offsets = new HashMap[TopicPartition, Long]()
-
consumerInit()
- private var currentPartition: TopicPartition = null
- private var polledRecords = consumer.poll(0)
- private var partitionIter = polledRecords.partitions.iterator
- private var recordIter: util.Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = null
+ var recordIter = consumer.poll(0).iterator
def consumerInit() {
(topic, partitionId, offset, whitelist) match {
@@ -102,30 +93,21 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset:
}
override def receive(): BaseConsumerRecord = {
- if (recordIter == null || !recordIter.hasNext) {
- if (!partitionIter.hasNext) {
- polledRecords = consumer.poll(timeoutMs)
- partitionIter = polledRecords.partitions.iterator
-
- if (!partitionIter.hasNext)
- throw new ConsumerTimeoutException
- }
-
- currentPartition = partitionIter.next
- recordIter = polledRecords.records(currentPartition).iterator
+ if (!recordIter.hasNext) {
+ recordIter = consumer.poll(timeoutMs).iterator
+ if (!recordIter.hasNext)
+ throw new ConsumerTimeoutException
}
val record = recordIter.next
- offsets.put(currentPartition, record.offset + 1)
-
BaseConsumerRecord(record.topic,
- record.partition,
- record.offset,
- record.timestamp,
- record.timestampType,
- record.key,
- record.value,
- record.headers)
+ record.partition,
+ record.offset,
+ record.timestamp,
+ record.timestampType,
+ record.key,
+ record.value,
+ record.headers)
}
override def stop() {
@@ -137,9 +119,7 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset:
}
override def commit() {
- import scala.collection.JavaConverters._
- consumer.commitSync(offsets.map { case (tp, offset) => (tp, new OffsetAndMetadata(offset))}.asJava)
- offsets.clear()
+ this.consumer.commitSync()
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5d7c8cc8/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index a1e2ffa..335c724 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -77,7 +77,6 @@ object ConsoleConsumer extends Logging {
try {
process(conf.maxMessages, conf.formatter, consumer, System.out, conf.skipMessageOnError)
} finally {
- consumer.commit()
consumer.cleanup()
conf.formatter.close()
reportRecordCount()
@@ -201,9 +200,7 @@ object ConsoleConsumer extends Logging {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, config.isolationLevel)
-
props
}