You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/12/31 12:42:20 UTC

kafka git commit: KAFKA-4351; MirrorMaker with new consumer should support comma-separated regex

Repository: kafka
Updated Branches:
  refs/heads/trunk 3d7e88456 -> 29d456cd2


KAFKA-4351; MirrorMaker with new consumer should support comma-separated regex

This makes it consistent with MirrorMaker with the old consumer.

Author: huxi <hu...@zhenrongbao.com>
Author: amethystic <hu...@hotmail.com>

Reviewers: Vahid Hashemian <va...@us.ibm.com>, Ismael Juma <is...@juma.me.uk>

Closes #2072 from amethystic/kafka-4351_Regex_behavior_change_for_new_consumer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/29d456cd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/29d456cd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/29d456cd

Branch: refs/heads/trunk
Commit: 29d456cd21d8f3430d52db50a8fd17cc8d700f36
Parents: 3d7e884
Author: huxi <hu...@zhenrongbao.com>
Authored: Sat Dec 31 10:45:47 2016 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat Dec 31 12:22:23 2016 +0000

----------------------------------------------------------------------
 .../main/scala/kafka/tools/MirrorMaker.scala    | 17 ++---
 .../tools/MirrorMakerIntegrationTest.scala      | 73 ++++++++++++++++++++
 docs/upgrade.html                               | 22 +++---
 3 files changed, 92 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/29d456cd/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 19a2570..42456f7 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -20,7 +20,7 @@ package kafka.tools
 import java.util
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.regex.{Pattern, PatternSyntaxException}
+import java.util.regex.Pattern
 import java.util.{Collections, Properties}
 
 import com.yammer.metrics.core.Gauge
@@ -64,7 +64,7 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig}
  */
 object MirrorMaker extends Logging with KafkaMetricsGroup {
 
-  private var producer: MirrorMakerProducer = null
+  private[tools] var producer: MirrorMakerProducer = null
   private var mirrorMakerThreads: Seq[MirrorMakerThread] = null
   private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false)
   // Track the messages not successfully sent by mirror maker.
@@ -574,7 +574,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     }
   }
 
-  private class MirrorMakerNewConsumer(consumer: Consumer[Array[Byte], Array[Byte]],
+  // Only for testing
+  private[tools] class MirrorMakerNewConsumer(consumer: Consumer[Array[Byte], Array[Byte]],
                                        customRebalanceListener: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener],
                                        whitelistOpt: Option[String])
     extends MirrorMakerBaseConsumer {
@@ -589,12 +590,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     override def init() {
       debug("Initiating new consumer")
       val consumerRebalanceListener = new InternalRebalanceListenerForNewConsumer(this, customRebalanceListener)
-      if (whitelistOpt.isDefined) {
+      whitelistOpt.foreach { whitelist =>
         try {
-          consumer.subscribe(Pattern.compile(whitelistOpt.get), consumerRebalanceListener)
+          consumer.subscribe(Pattern.compile(Whitelist(whitelist).regex), consumerRebalanceListener)
         } catch {
-          case pse: PatternSyntaxException =>
-            error("Invalid expression syntax: %s".format(whitelistOpt.get))
+          case pse: RuntimeException =>
+            error(s"Invalid expression syntax: $whitelist")
             throw pse
         }
       }
@@ -686,7 +687,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     }
   }
 
-  private class MirrorMakerProducer(val producerProps: Properties) {
+  private[tools] class MirrorMakerProducer(val producerProps: Properties) {
 
     val sync = producerProps.getProperty("producer.type", "async").equals("sync")
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/29d456cd/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
new file mode 100644
index 0000000..465e8de
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
@@ -0,0 +1,73 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.tools
+
+import java.util.Properties
+
+import kafka.consumer.ConsumerTimeoutException
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.tools.MirrorMaker.{MirrorMakerNewConsumer, MirrorMakerProducer}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
+import org.junit.Test
+
+class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
+
+  override def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect)
+    .map(KafkaConfig.fromProps(_, new Properties()))
+
+  @Test
+  def testCommaSeparatedRegex(): Unit = {
+    val topic = "new-topic"
+    val msg = "a test message"
+    val brokerList = TestUtils.getBrokerListStrFromServers(servers)
+
+    val producerProps = new Properties
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    producerProps.put("producer.type", "sync")
+    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer])
+    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer])
+    val producer = new MirrorMakerProducer(producerProps)
+    MirrorMaker.producer = producer
+    MirrorMaker.producer.send(new ProducerRecord(topic, msg.getBytes()))
+    MirrorMaker.producer.close()
+
+    val consumerProps = new Properties
+    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
+    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    val consumer = new KafkaConsumer(consumerProps, new ByteArrayDeserializer, new ByteArrayDeserializer)
+
+    val mirrorMakerConsumer = new MirrorMakerNewConsumer(consumer, None, whitelistOpt = Some("another_topic,new.*,foo"))
+    mirrorMakerConsumer.init()
+    try {
+      TestUtils.waitUntilTrue(() => {
+        try {
+          val data = mirrorMakerConsumer.receive()
+          data.topic == topic && new String(data.value) == msg
+        } catch {
+          // this exception is thrown if no record is returned within a short timeout, so safe to ignore
+          case _: ConsumerTimeoutException => false
+        }
+      }, "MirrorMaker consumer should read the expected message from the expected topic within the timeout")
+    } finally consumer.close()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/29d456cd/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 06b53da..3e2c52e 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -14,19 +14,9 @@
  See the License for the specific language governing permissions and
  limitations under the License.
 -->
-<h4><a id="upgrade_10_2_0" href="#upgrade_10_2_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x or 0.10.1.0 to 0.10.2.0</a></h4>
-Users upgrading from versions prior to 0.10.1.0 should follow the upgrade guide <a href="#upgrade_10_1">here</a>. Users upgrading from 0.10.1.0
+<h4><a id="upgrade_10_2_0" href="#upgrade_10_2_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x or 0.10.1.x to 0.10.2.0</a></h4>
+Users upgrading from versions prior to 0.10.1.x should follow the upgrade guide <a href="#upgrade_10_1">here</a>. Users upgrading from 0.10.1.0
 can upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
-<br>
-0.10.2.0 has <a href="#upgrade_10_2_0_breaking">Potential breaking changes</a> (Please review before upgrading).
-
-<h5><a id="upgrade_10_2_0_breaking" href="#upgrade_10_2_0_breaking">Potential breaking changes in 0.10.2.0</a></h5>
-<ul>
-    <li>Several methods on the Java consumer may now throw <code>InterruptException</code> if the calling thread is interrupted. 
-        Please refer to the <code>KafkaConsumer</code> Javadoc for a more in-depth explanation of this change.</li>
-</ul>
-
-<h4><a id="upgrade_10_2" href="#upgrade_10_2">Upgrading from 0.8.x, 0.9.x, 0.10.0.X, or 0.10.1.X to 0.10.2.0</a></h4>
 
 <p><b>For a rolling upgrade:</b></p>
 
@@ -39,6 +29,14 @@ can upgrade the brokers one at a time: shut down the broker, update the code, an
   </li>
 </ol>
 
+<h5><a id="upgrade_1020_notable" href="#upgrade_1020_notable">Notable changes in 0.10.2.0</a></h5>
+<ul>
+    <li>Several methods on the Java consumer may now throw <code>InterruptException</code> if the calling thread is interrupted.
+        Please refer to the <code>KafkaConsumer</code> Javadoc for a more in-depth explanation of this change.</li>
+    <li>Multiple regular expressions separated by commas can be passed to MirrorMaker with the new Java consumer via the --whitelist option. This
+        makes the behaviour consistent with MirrorMaker when used the old Scala consumer.</li>
+</ul>
+
 <h4><a id="upgrade_10_1" href="#upgrade_10_1">Upgrading from 0.8.x, 0.9.x or 0.10.0.X to 0.10.1.0</a></h4>
 0.10.1.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade.
 However, please notice the <a href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a> before upgrade.