You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/12/31 12:42:20 UTC
kafka git commit: KAFKA-4351;
MirrorMaker with new consumer should support comma-separated regex
Repository: kafka
Updated Branches:
refs/heads/trunk 3d7e88456 -> 29d456cd2
KAFKA-4351; MirrorMaker with new consumer should support comma-separated regex
This makes it consistent with MirrorMaker with the old consumer.
Author: huxi <hu...@zhenrongbao.com>
Author: amethystic <hu...@hotmail.com>
Reviewers: Vahid Hashemian <va...@us.ibm.com>, Ismael Juma <is...@juma.me.uk>
Closes #2072 from amethystic/kafka-4351_Regex_behavior_change_for_new_consumer
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/29d456cd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/29d456cd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/29d456cd
Branch: refs/heads/trunk
Commit: 29d456cd21d8f3430d52db50a8fd17cc8d700f36
Parents: 3d7e884
Author: huxi <hu...@zhenrongbao.com>
Authored: Sat Dec 31 10:45:47 2016 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat Dec 31 12:22:23 2016 +0000
----------------------------------------------------------------------
.../main/scala/kafka/tools/MirrorMaker.scala | 17 ++---
.../tools/MirrorMakerIntegrationTest.scala | 73 ++++++++++++++++++++
docs/upgrade.html | 22 +++---
3 files changed, 92 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/29d456cd/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 19a2570..42456f7 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -20,7 +20,7 @@ package kafka.tools
import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.regex.{Pattern, PatternSyntaxException}
+import java.util.regex.Pattern
import java.util.{Collections, Properties}
import com.yammer.metrics.core.Gauge
@@ -64,7 +64,7 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig => NewConsumerConfig}
*/
object MirrorMaker extends Logging with KafkaMetricsGroup {
- private var producer: MirrorMakerProducer = null
+ private[tools] var producer: MirrorMakerProducer = null
private var mirrorMakerThreads: Seq[MirrorMakerThread] = null
private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false)
// Track the messages not successfully sent by mirror maker.
@@ -574,7 +574,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
}
}
- private class MirrorMakerNewConsumer(consumer: Consumer[Array[Byte], Array[Byte]],
+ // Only for testing
+ private[tools] class MirrorMakerNewConsumer(consumer: Consumer[Array[Byte], Array[Byte]],
customRebalanceListener: Option[org.apache.kafka.clients.consumer.ConsumerRebalanceListener],
whitelistOpt: Option[String])
extends MirrorMakerBaseConsumer {
@@ -589,12 +590,12 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
override def init() {
debug("Initiating new consumer")
val consumerRebalanceListener = new InternalRebalanceListenerForNewConsumer(this, customRebalanceListener)
- if (whitelistOpt.isDefined) {
+ whitelistOpt.foreach { whitelist =>
try {
- consumer.subscribe(Pattern.compile(whitelistOpt.get), consumerRebalanceListener)
+ consumer.subscribe(Pattern.compile(Whitelist(whitelist).regex), consumerRebalanceListener)
} catch {
- case pse: PatternSyntaxException =>
- error("Invalid expression syntax: %s".format(whitelistOpt.get))
+ case pse: RuntimeException =>
+ error(s"Invalid expression syntax: $whitelist")
throw pse
}
}
@@ -686,7 +687,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
}
}
- private class MirrorMakerProducer(val producerProps: Properties) {
+ private[tools] class MirrorMakerProducer(val producerProps: Properties) {
val sync = producerProps.getProperty("producer.type", "async").equals("sync")
http://git-wip-us.apache.org/repos/asf/kafka/blob/29d456cd/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
new file mode 100644
index 0000000..465e8de
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.tools
+
+import java.util.Properties
+
+import kafka.consumer.ConsumerTimeoutException
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.tools.MirrorMaker.{MirrorMakerNewConsumer, MirrorMakerProducer}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
+import org.junit.Test
+
+class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
+
+ override def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect)
+ .map(KafkaConfig.fromProps(_, new Properties()))
+
+ @Test
+ def testCommaSeparatedRegex(): Unit = {
+ val topic = "new-topic"
+ val msg = "a test message"
+ val brokerList = TestUtils.getBrokerListStrFromServers(servers)
+
+ val producerProps = new Properties
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ producerProps.put("producer.type", "sync")
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer])
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer])
+ val producer = new MirrorMakerProducer(producerProps)
+ MirrorMaker.producer = producer
+ MirrorMaker.producer.send(new ProducerRecord(topic, msg.getBytes()))
+ MirrorMaker.producer.close()
+
+ val consumerProps = new Properties
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
+ consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ val consumer = new KafkaConsumer(consumerProps, new ByteArrayDeserializer, new ByteArrayDeserializer)
+
+ val mirrorMakerConsumer = new MirrorMakerNewConsumer(consumer, None, whitelistOpt = Some("another_topic,new.*,foo"))
+ mirrorMakerConsumer.init()
+ try {
+ TestUtils.waitUntilTrue(() => {
+ try {
+ val data = mirrorMakerConsumer.receive()
+ data.topic == topic && new String(data.value) == msg
+ } catch {
+ // this exception is thrown if no record is returned within a short timeout, so safe to ignore
+ case _: ConsumerTimeoutException => false
+ }
+ }, "MirrorMaker consumer should read the expected message from the expected topic within the timeout")
+ } finally consumer.close()
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/29d456cd/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 06b53da..3e2c52e 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -14,19 +14,9 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<h4><a id="upgrade_10_2_0" href="#upgrade_10_2_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x or 0.10.1.0 to 0.10.2.0</a></h4>
-Users upgrading from versions prior to 0.10.1.0 should follow the upgrade guide <a href="#upgrade_10_1">here</a>. Users upgrading from 0.10.1.0
+<h4><a id="upgrade_10_2_0" href="#upgrade_10_2_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x or 0.10.1.x to 0.10.2.0</a></h4>
+Users upgrading from versions prior to 0.10.1.x should follow the upgrade guide <a href="#upgrade_10_1">here</a>. Users upgrading from 0.10.1.0
can upgrade the brokers one at a time: shut down the broker, update the code, and restart it.
-<br>
-0.10.2.0 has <a href="#upgrade_10_2_0_breaking">Potential breaking changes</a> (Please review before upgrading).
-
-<h5><a id="upgrade_10_2_0_breaking" href="#upgrade_10_2_0_breaking">Potential breaking changes in 0.10.2.0</a></h5>
-<ul>
- <li>Several methods on the Java consumer may now throw <code>InterruptException</code> if the calling thread is interrupted.
- Please refer to the <code>KafkaConsumer</code> Javadoc for a more in-depth explanation of this change.</li>
-</ul>
-
-<h4><a id="upgrade_10_2" href="#upgrade_10_2">Upgrading from 0.8.x, 0.9.x, 0.10.0.X, or 0.10.1.X to 0.10.2.0</a></h4>
<p><b>For a rolling upgrade:</b></p>
@@ -39,6 +29,14 @@ can upgrade the brokers one at a time: shut down the broker, update the code, an
</li>
</ol>
+<h5><a id="upgrade_1020_notable" href="#upgrade_1020_notable">Notable changes in 0.10.2.0</a></h5>
+<ul>
+ <li>Several methods on the Java consumer may now throw <code>InterruptException</code> if the calling thread is interrupted.
+ Please refer to the <code>KafkaConsumer</code> Javadoc for a more in-depth explanation of this change.</li>
+ <li>Multiple regular expressions separated by commas can be passed to MirrorMaker with the new Java consumer via the --whitelist option. This
+ makes the behaviour consistent with MirrorMaker when used the old Scala consumer.</li>
+</ul>
+
<h4><a id="upgrade_10_1" href="#upgrade_10_1">Upgrading from 0.8.x, 0.9.x or 0.10.0.X to 0.10.1.0</a></h4>
0.10.1.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade.
However, please notice the <a href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a> before upgrade.