You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/11 04:18:14 UTC
kafka git commit: KAFKA-2770: Catch and ignore WakeupException for
commit upon closing
Repository: kafka
Updated Branches:
refs/heads/trunk ea702c384 -> e0098b456
KAFKA-2770: Catch and ignore WakeupException for commit upon closing
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Gwen Shapira, Geoff Anderson, Jason Gustafson
Closes #470 from guozhangwang/K2770
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e0098b45
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e0098b45
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e0098b45
Branch: refs/heads/trunk
Commit: e0098b4567541d1c0cefc6f57ce38f67a9133b5e
Parents: ea702c3
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Nov 10 19:24:15 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Nov 10 19:24:15 2015 -0800
----------------------------------------------------------------------
.../consumer/internals/ConsumerCoordinator.java | 2 ++
.../clients/consumer/internals/Fetcher.java | 6 +++-
.../main/scala/kafka/tools/MirrorMaker.scala | 37 ++++++++++++++++----
tests/kafkatest/tests/mirror_maker_test.py | 3 --
4 files changed, 38 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e0098b45/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index faef7ce..2ee3a4d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -422,6 +422,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
OffsetCommitRequest.DEFAULT_RETENTION_TIME,
offsetData);
+ log.trace("Sending offset-commit request with {} to {}", offsets, coordinator);
+
return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req)
.compose(new OffsetCommitResponseHandler(offsets));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e0098b45/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e988b2b..a034264 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -373,6 +373,10 @@ public class Fetcher<K, V> {
// this partition in the previous request at all
subscriptions.fetched(part.partition, consumed);
} else if (part.fetchOffset == consumed) {
+ long nextOffset = part.records.get(part.records.size() - 1).offset() + 1;
+
+ log.trace("Returning fetched records for assigned partition {} and update consumed position to {}", part.partition, nextOffset);
+
List<ConsumerRecord<K, V>> records = drained.get(part.partition);
if (records == null) {
records = part.records;
@@ -380,7 +384,7 @@ public class Fetcher<K, V> {
} else {
records.addAll(part.records);
}
- subscriptions.consumed(part.partition, part.records.get(part.records.size() - 1).offset() + 1);
+ subscriptions.consumed(part.partition, nextOffset);
} else {
// these records aren't next in line based on the last consumed position, ignore them
// they must be from an obsolete request
http://git-wip-us.apache.org/repos/asf/kafka/blob/e0098b45/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 79001de..6e54b85 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -32,7 +32,7 @@ import kafka.message.MessageAndMetadata
import kafka.metrics.KafkaMetricsGroup
import kafka.serializer.DefaultDecoder
import kafka.utils.{CommandLineUtils, CoreUtils, Logging}
-import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer}
+import org.apache.kafka.clients.consumer.{OffsetAndMetadata, Consumer, ConsumerRecord, KafkaConsumer}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
@@ -41,6 +41,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.errors.WakeupException
import scala.collection.JavaConversions._
+import scala.collection.mutable.HashMap
import scala.util.control.ControlThrowable
/**
@@ -335,7 +336,16 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
def commitOffsets(mirrorMakerConsumer: MirrorMakerBaseConsumer) {
if (!exitingOnSendFailure) {
trace("Committing offsets.")
- mirrorMakerConsumer.commit()
+ try {
+ mirrorMakerConsumer.commit()
+ } catch {
+ case e: WakeupException =>
+ // we only call wakeup() once to close the consumer,
+ // so if we catch it in commit we can safely retry
+ // and re-throw to break the loop
+ mirrorMakerConsumer.commit()
+ throw e
+ }
} else {
info("Exiting on send failure, skip committing offsets.")
}
@@ -383,7 +393,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
try {
while (!exitingOnSendFailure && !shuttingDown && mirrorMakerConsumer.hasData) {
val data = mirrorMakerConsumer.receive()
- trace("Sending message with value size %d".format(data.value.length))
+ trace("Sending message with value size %d and offset %d".format(data.value.length, data.offset))
val records = messageHandler.handle(data)
records.foreach(producer.send)
maybeFlushAndCommitOffsets()
@@ -403,9 +413,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
info("Flushing producer.")
producer.flush()
info("Committing consumer offsets.")
- commitOffsets(mirrorMakerConsumer)
+ try {
+ commitOffsets(mirrorMakerConsumer)
+ } catch {
+ case e: WakeupException => // just ignore
+ }
info("Shutting down consumer connectors.")
- mirrorMakerConsumer.stop()
+ // we do not need to call consumer.close() since the consumer has already been interrupted
mirrorMakerConsumer.cleanup()
shutdownLatch.countDown()
info("Mirror maker thread stopped")
@@ -419,6 +433,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
def maybeFlushAndCommitOffsets() {
if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) {
+ debug("Committing MirrorMaker state automatically.")
producer.flush()
commitOffsets(mirrorMakerConsumer)
lastOffsetCommitMs = System.currentTimeMillis()
@@ -492,6 +507,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
val regex = whitelistOpt.getOrElse(throw new IllegalArgumentException("New consumer only supports whitelist."))
var recordIter: java.util.Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = null
+ // TODO: we need to manually maintain the consumed offsets for new consumer
+ // since its internal consumed position is updated in batch rather than one
+ // record at a time, this can be resolved when we break the unification of both consumers
+ private val offsets = new HashMap[TopicPartition, Long]()
+
override def init() {
debug("Initiating new consumer")
val consumerRebalanceListener = new InternalRebalanceListenerForNewConsumer(this, customRebalanceListener)
@@ -507,6 +527,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
recordIter = consumer.poll(1000).iterator
val record = recordIter.next()
+ val tp = new TopicPartition(record.topic, record.partition)
+
+ offsets.put(tp, record.offset + 1)
+
BaseConsumerRecord(record.topic, record.partition, record.offset, record.key, record.value)
}
@@ -519,7 +543,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
}
override def commit() {
- consumer.commitSync()
+ consumer.commitSync(offsets.map { case (tp, offset) => (tp, new OffsetAndMetadata(offset, ""))})
+ offsets.clear()
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e0098b45/tests/kafkatest/tests/mirror_maker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/mirror_maker_test.py b/tests/kafkatest/tests/mirror_maker_test.py
index 312948a..d01f6b5 100644
--- a/tests/kafkatest/tests/mirror_maker_test.py
+++ b/tests/kafkatest/tests/mirror_maker_test.py
@@ -126,9 +126,6 @@ class TestMirrorMakerService(ProduceConsumeValidateTest):
self.mirror_maker.stop()
@matrix(offsets_storage=["kafka", "zookeeper"], new_consumer=[False], clean_shutdown=[True, False])
- # Ignore tests where mirrormaker uses new consumer - both cases are currently broken
- # KAFKA-2770, KAFKA-2747
- @ignore
@matrix(new_consumer=[True], clean_shutdown=[True, False])
def test_bounce(self, offsets_storage="kafka", new_consumer=True, clean_shutdown=True):
"""