You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/11 04:18:14 UTC

kafka git commit: KAFKA-2770: Catch and ignore WakeupException for commit upon closing

Repository: kafka
Updated Branches:
  refs/heads/trunk ea702c384 -> e0098b456


KAFKA-2770: Catch and ignore WakeupException for commit upon closing

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Gwen Shapira, Geoff Anderson, Jason Gustafson

Closes #470 from guozhangwang/K2770


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e0098b45
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e0098b45
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e0098b45

Branch: refs/heads/trunk
Commit: e0098b4567541d1c0cefc6f57ce38f67a9133b5e
Parents: ea702c3
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Nov 10 19:24:15 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Nov 10 19:24:15 2015 -0800

----------------------------------------------------------------------
 .../consumer/internals/ConsumerCoordinator.java |  2 ++
 .../clients/consumer/internals/Fetcher.java     |  6 +++-
 .../main/scala/kafka/tools/MirrorMaker.scala    | 37 ++++++++++++++++----
 tests/kafkatest/tests/mirror_maker_test.py      |  3 --
 4 files changed, 38 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e0098b45/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index faef7ce..2ee3a4d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -422,6 +422,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 OffsetCommitRequest.DEFAULT_RETENTION_TIME,
                 offsetData);
 
+        log.trace("Sending offset-commit request with {} to {}", offsets, coordinator);
+
         return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req)
                 .compose(new OffsetCommitResponseHandler(offsets));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0098b45/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e988b2b..a034264 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -373,6 +373,10 @@ public class Fetcher<K, V> {
                     // this partition in the previous request at all
                     subscriptions.fetched(part.partition, consumed);
                 } else if (part.fetchOffset == consumed) {
+                    long nextOffset = part.records.get(part.records.size() - 1).offset() + 1;
+
+                    log.trace("Returning fetched records for assigned partition {} and update consumed position to {}", part.partition, nextOffset);
+
                     List<ConsumerRecord<K, V>> records = drained.get(part.partition);
                     if (records == null) {
                         records = part.records;
@@ -380,7 +384,7 @@ public class Fetcher<K, V> {
                     } else {
                         records.addAll(part.records);
                     }
-                    subscriptions.consumed(part.partition, part.records.get(part.records.size() - 1).offset() + 1);
+                    subscriptions.consumed(part.partition, nextOffset);
                 } else {
                     // these records aren't next in line based on the last consumed position, ignore them
                     // they must be from an obsolete request

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0098b45/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 79001de..6e54b85 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -32,7 +32,7 @@ import kafka.message.MessageAndMetadata
 import kafka.metrics.KafkaMetricsGroup
 import kafka.serializer.DefaultDecoder
 import kafka.utils.{CommandLineUtils, CoreUtils, Logging}
-import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer}
+import org.apache.kafka.clients.consumer.{OffsetAndMetadata, Consumer, ConsumerRecord, KafkaConsumer}
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
 import org.apache.kafka.common.TopicPartition
@@ -41,6 +41,7 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.errors.WakeupException
 
 import scala.collection.JavaConversions._
+import scala.collection.mutable.HashMap
 import scala.util.control.ControlThrowable
 
 /**
@@ -335,7 +336,16 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
   def commitOffsets(mirrorMakerConsumer: MirrorMakerBaseConsumer) {
     if (!exitingOnSendFailure) {
       trace("Committing offsets.")
-      mirrorMakerConsumer.commit()
+      try {
+        mirrorMakerConsumer.commit()
+      } catch {
+        case e: WakeupException =>
+          // we only call wakeup() once to close the consumer,
+          // so if we catch it in commit we can safely retry
+          // and re-throw to break the loop
+          mirrorMakerConsumer.commit()
+          throw e
+      }
     } else {
       info("Exiting on send failure, skip committing offsets.")
     }
@@ -383,7 +393,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
           try {
             while (!exitingOnSendFailure && !shuttingDown && mirrorMakerConsumer.hasData) {
               val data = mirrorMakerConsumer.receive()
-              trace("Sending message with value size %d".format(data.value.length))
+              trace("Sending message with value size %d and offset %d".format(data.value.length, data.offset))
               val records = messageHandler.handle(data)
               records.foreach(producer.send)
               maybeFlushAndCommitOffsets()
@@ -403,9 +413,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         info("Flushing producer.")
         producer.flush()
         info("Committing consumer offsets.")
-        commitOffsets(mirrorMakerConsumer)
+        try {
+          commitOffsets(mirrorMakerConsumer)
+        } catch {
+          case e: WakeupException => // just ignore
+        }
         info("Shutting down consumer connectors.")
-        mirrorMakerConsumer.stop()
+        // we do not need to call consumer.close() since the consumer has already been interrupted
         mirrorMakerConsumer.cleanup()
         shutdownLatch.countDown()
         info("Mirror maker thread stopped")
@@ -419,6 +433,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
     def maybeFlushAndCommitOffsets() {
       if (System.currentTimeMillis() - lastOffsetCommitMs > offsetCommitIntervalMs) {
+        debug("Committing MirrorMaker state automatically.")
         producer.flush()
         commitOffsets(mirrorMakerConsumer)
         lastOffsetCommitMs = System.currentTimeMillis()
@@ -492,6 +507,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     val regex = whitelistOpt.getOrElse(throw new IllegalArgumentException("New consumer only supports whitelist."))
     var recordIter: java.util.Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = null
 
+    // TODO: we need to manually maintain the consumed offsets for new consumer
+    // since its internal consumed position is updated in batch rather than one
+    // record at a time, this can be resolved when we break the unification of both consumers
+    private val offsets = new HashMap[TopicPartition, Long]()
+
     override def init() {
       debug("Initiating new consumer")
       val consumerRebalanceListener = new InternalRebalanceListenerForNewConsumer(this, customRebalanceListener)
@@ -507,6 +527,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
         recordIter = consumer.poll(1000).iterator
 
       val record = recordIter.next()
+      val tp = new TopicPartition(record.topic, record.partition)
+
+      offsets.put(tp, record.offset + 1)
+
       BaseConsumerRecord(record.topic, record.partition, record.offset, record.key, record.value)
     }
 
@@ -519,7 +543,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     }
 
     override def commit() {
-      consumer.commitSync()
+      consumer.commitSync(offsets.map { case (tp, offset) =>  (tp, new OffsetAndMetadata(offset, ""))})
+      offsets.clear()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e0098b45/tests/kafkatest/tests/mirror_maker_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/mirror_maker_test.py b/tests/kafkatest/tests/mirror_maker_test.py
index 312948a..d01f6b5 100644
--- a/tests/kafkatest/tests/mirror_maker_test.py
+++ b/tests/kafkatest/tests/mirror_maker_test.py
@@ -126,9 +126,6 @@ class TestMirrorMakerService(ProduceConsumeValidateTest):
         self.mirror_maker.stop()
 
     @matrix(offsets_storage=["kafka", "zookeeper"], new_consumer=[False], clean_shutdown=[True, False])
-    # Ignore tests where mirrormaker uses new consumer - both cases are currently broken
-    # KAFKA-2770, KAFKA-2747
-    @ignore
     @matrix(new_consumer=[True], clean_shutdown=[True, False])
     def test_bounce(self, offsets_storage="kafka", new_consumer=True, clean_shutdown=True):
         """