You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2020/07/03 09:42:41 UTC
[kafka] branch trunk updated: KAFKA-10232: MirrorMaker2 internal
topics Formatters KIP-597 (#8604)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new caa806c KAFKA-10232: MirrorMaker2 internal topics Formatters KIP-597 (#8604)
caa806c is described below
commit caa806cd82fb9fa88510c81de53e69ac9846311d
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Fri Jul 3 10:41:45 2020 +0100
KAFKA-10232: MirrorMaker2 internal topics Formatters KIP-597 (#8604)
This PR includes 3 MessageFormatters for MirrorMaker2 internal topics:
- HeartbeatFormatter
- CheckpointFormatter
- OffsetSyncFormatter
This also introduces a new public interface org.apache.kafka.common.MessageFormatter that users can implement to build custom formatters.
Reviewers: Konstantine Karantasis <k....@gmail.com>, Ryanne Dolan <ry...@gmail.com>, David Jacot <dj...@confluent.io>
Co-authored-by: Mickael Maison <mi...@gmail.com>
Co-authored-by: Edoardo Comar <ec...@uk.ibm.com>
---
checkstyle/import-control.xml | 1 +
.../org/apache/kafka/common/MessageFormatter.java | 66 +++++++++++++++++++
.../apache/kafka/connect/mirror/OffsetSync.java | 10 +--
.../mirror/formatters/CheckpointFormatter.java | 31 +++++++++
.../mirror/formatters/HeartbeatFormatter.java | 31 +++++++++
.../mirror/formatters/OffsetSyncFormatter.java | 31 +++++++++
.../main/scala/kafka/common/MessageFormatter.scala | 15 +----
.../coordinator/group/GroupMetadataManager.scala | 4 +-
.../coordinator/transaction/TransactionLog.scala | 3 +-
.../main/scala/kafka/tools/ConsoleConsumer.scala | 23 ++++---
.../unit/kafka/tools/ConsoleConsumerTest.scala | 73 +++++++++++++++++++++-
gradle/spotbugs-exclude.xml | 7 +++
12 files changed, 258 insertions(+), 37 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index b990081..f45e362 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -48,6 +48,7 @@
<allow pkg="org.apache.kafka.common.memory" />
<subpackage name="common">
+ <allow class="org.apache.kafka.clients.consumer.ConsumerRecord" exact-match="true" />
<disallow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.annotation" />
diff --git a/clients/src/main/java/org/apache/kafka/common/MessageFormatter.java b/clients/src/main/java/org/apache/kafka/common/MessageFormatter.java
new file mode 100644
index 0000000..c4a255f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/MessageFormatter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.io.Closeable;
+import java.io.PrintStream;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * This interface allows to define Formatters that can be used to parse and format records read by a
+ * Consumer instance for display.
+ * The kafka-console-consumer has built-in support for MessageFormatter, via the --formatter flag.
+ *
+ * Kafka provides a few implementations to display records of internal topics such as __consumer_offsets,
+ * __transaction_state and the MirrorMaker2 topics.
+ *
+ */
+public interface MessageFormatter extends Configurable, Closeable {
+
+ /**
+ * Initialises the MessageFormatter
+ * @param props Properties to configure the formatter
+ * @deprecated Use {@link #configure(Map)} instead, this method is for backward compatibility with the older Formatter interface
+ */
+ @Deprecated
+ default public void init(Properties props) {}
+
+ /**
+ * Configures the MessageFormatter
+ * @param configs Map to configure the formatter
+ */
+ default public void configure(Map<String, ?> configs) {
+ Properties properties = new Properties();
+ properties.putAll(configs);
+ init(properties);
+ }
+
+ /**
+ * Parses and formats a record for display
+ * @param consumerRecord the record to format
+ * @param output the print stream used to output the record
+ */
+ public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output);
+
+ /**
+ * Closes the formatter
+ */
+ default public void close() {}
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java
index abdc64c..68e6441 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java
@@ -83,17 +83,17 @@ public class OffsetSync {
return buffer;
}
- static OffsetSync deserializeRecord(ConsumerRecord<byte[], byte[]> record) {
+ public static OffsetSync deserializeRecord(ConsumerRecord<byte[], byte[]> record) {
Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(record.key()));
String topic = keyStruct.getString(TOPIC_KEY);
int partition = keyStruct.getInt(PARTITION_KEY);
-
+
Struct valueStruct = VALUE_SCHEMA.read(ByteBuffer.wrap(record.value()));
long upstreamOffset = valueStruct.getLong(UPSTREAM_OFFSET_KEY);
long downstreamOffset = valueStruct.getLong(DOWNSTREAM_OFFSET_KEY);
-
+
return new OffsetSync(new TopicPartition(topic, partition), upstreamOffset, downstreamOffset);
- }
+ }
private Struct valueStruct() {
Struct struct = new Struct(VALUE_SCHEMA);
@@ -116,5 +116,5 @@ public class OffsetSync {
byte[] recordValue() {
return serializeValue().array();
}
-};
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/CheckpointFormatter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/CheckpointFormatter.java
new file mode 100644
index 0000000..33fe695
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/CheckpointFormatter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.formatters;
+
+import java.io.PrintStream;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.connect.mirror.Checkpoint;
+
+public class CheckpointFormatter implements MessageFormatter {
+
+ @Override
+ public void writeTo(ConsumerRecord<byte[], byte[]> record, PrintStream output) {
+ output.println(Checkpoint.deserializeRecord(record));
+ }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/HeartbeatFormatter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/HeartbeatFormatter.java
new file mode 100644
index 0000000..a193dbe
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/HeartbeatFormatter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.formatters;
+
+import java.io.PrintStream;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.connect.mirror.Heartbeat;
+
+public class HeartbeatFormatter implements MessageFormatter {
+
+ @Override
+ public void writeTo(ConsumerRecord<byte[], byte[]> record, PrintStream output) {
+ output.println(Heartbeat.deserializeRecord(record));
+ }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/OffsetSyncFormatter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/OffsetSyncFormatter.java
new file mode 100644
index 0000000..dacae60
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/OffsetSyncFormatter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.formatters;
+
+import java.io.PrintStream;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.connect.mirror.OffsetSync;
+
+public class OffsetSyncFormatter implements MessageFormatter {
+
+ @Override
+ public void writeTo(ConsumerRecord<byte[], byte[]> record, PrintStream output) {
+ output.println(OffsetSync.deserializeRecord(record));
+ }
+}
diff --git a/core/src/main/scala/kafka/common/MessageFormatter.scala b/core/src/main/scala/kafka/common/MessageFormatter.scala
index 9f6c3fa..7826eb7 100644
--- a/core/src/main/scala/kafka/common/MessageFormatter.scala
+++ b/core/src/main/scala/kafka/common/MessageFormatter.scala
@@ -17,23 +17,12 @@
package kafka.common
-import java.io.PrintStream
-import java.util.Properties
-
-import org.apache.kafka.clients.consumer.ConsumerRecord
-
/**
* Typical implementations of this interface convert a `ConsumerRecord` into a type that can then be passed to
* a `PrintStream`.
*
* This is used by the `ConsoleConsumer`.
*/
-trait MessageFormatter {
-
- def init(props: Properties): Unit = {}
-
- def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit
-
- def close(): Unit = {}
-
+@deprecated("This class is deprecated and will be replaced by org.apache.kafka.common.MessageFormatter.", "2.7.0")
+trait MessageFormatter extends org.apache.kafka.common.MessageFormatter {
}
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 6883ceb..c77f702 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -27,7 +27,7 @@ import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge
import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1, KAFKA_2_3_IV0}
-import kafka.common.{MessageFormatter, OffsetAndMetadata}
+import kafka.common.OffsetAndMetadata
import kafka.log.AppendOrigin
import kafka.metrics.KafkaMetricsGroup
import kafka.server.{FetchLogEnd, ReplicaManager}
@@ -47,7 +47,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchResponse}
import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.{KafkaException, MessageFormatter, TopicPartition}
import scala.collection._
import scala.collection.mutable.ArrayBuffer
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
index 829e9e6..7fc1f3b 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
@@ -20,12 +20,11 @@ import java.io.PrintStream
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
-import kafka.common.MessageFormatter
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.protocol.types.Type._
import org.apache.kafka.common.protocol.types._
import org.apache.kafka.common.record.{CompressionType, Record, RecordBatch}
-import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.{KafkaException, MessageFormatter, TopicPartition}
import scala.collection.mutable
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 5f7b9ab..8427ae1 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -22,15 +22,14 @@ import java.nio.charset.StandardCharsets
import java.time.Duration
import java.util.concurrent.CountDownLatch
import java.util.regex.Pattern
-import java.util.{Collections, Locale, Properties, Random}
+import java.util.{Collections, Locale, Map, Properties, Random}
import com.typesafe.scalalogging.LazyLogging
import joptsimple._
-import kafka.common.MessageFormatter
import kafka.utils.Implicits._
import kafka.utils.{Exit, _}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecord, KafkaConsumer}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{MessageFormatter, TopicPartition}
import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException, WakeupException}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.requests.ListOffsetRequest
@@ -309,7 +308,7 @@ object ConsoleConsumer extends Logging {
formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
}
- formatter.init(formatterArgs)
+ formatter.configure(formatterArgs.asScala.asJava)
val topicOrFilterOpt = List(topicIdOpt, whitelistOpt).filter(options.has)
if (topicOrFilterOpt.size != 1)
@@ -466,7 +465,9 @@ class DefaultMessageFormatter extends MessageFormatter {
var keyDeserializer: Option[Deserializer[_]] = None
var valueDeserializer: Option[Deserializer[_]] = None
- override def init(props: Properties): Unit = {
+ override def configure(configs: Map[String, _]): Unit = {
+ val props = new java.util.Properties()
+ configs.asScala.foreach { case (key, value) => props.put(key, value.toString) }
if (props.containsKey("print.timestamp"))
printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true")
if (props.containsKey("print.key"))
@@ -548,7 +549,7 @@ class DefaultMessageFormatter extends MessageFormatter {
class LoggingMessageFormatter extends MessageFormatter with LazyLogging {
private val defaultWriter: DefaultMessageFormatter = new DefaultMessageFormatter
- override def init(props: Properties): Unit = defaultWriter.init(props)
+ override def configure(configs: Map[String, _]): Unit = defaultWriter.configure(configs)
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
import consumerRecord._
@@ -560,7 +561,6 @@ class LoggingMessageFormatter extends MessageFormatter with LazyLogging {
}
class NoOpMessageFormatter extends MessageFormatter {
- override def init(props: Properties): Unit = {}
def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {}
}
@@ -568,12 +568,11 @@ class NoOpMessageFormatter extends MessageFormatter {
class ChecksumMessageFormatter extends MessageFormatter {
private var topicStr: String = _
- override def init(props: Properties): Unit = {
- topicStr = props.getProperty("topic")
- if (topicStr != null)
- topicStr = topicStr + ":"
+ override def configure(configs: Map[String, _]): Unit = {
+ topicStr = if (configs.containsKey("topic"))
+ configs.get("topic").toString + ":"
else
- topicStr = ""
+ ""
}
@nowarn("cat=deprecation")
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index ccbf078..e930ad5 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -17,14 +17,15 @@
package kafka.tools
-import java.io.PrintStream
+import java.io.{ByteArrayOutputStream, PrintStream}
import java.nio.file.Files
+import java.util.{HashMap, Map => JMap}
-import kafka.common.MessageFormatter
import kafka.tools.ConsoleConsumer.ConsumerWrapper
import kafka.utils.{Exit, TestUtils}
import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{MessageFormatter, TopicPartition}
+import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.test.MockDeserializer
import org.mockito.Mockito._
@@ -495,4 +496,70 @@ class ConsoleConsumerTest {
Exit.resetExitProcedure()
}
}
+
+ @Test
+ def testDefaultMessageFormatter(): Unit = {
+ val record = new ConsumerRecord("topic", 0, 123, "key".getBytes, "value".getBytes)
+ val formatter = new DefaultMessageFormatter()
+ val configs: JMap[String, String] = new HashMap()
+
+ formatter.configure(configs)
+ var out = new ByteArrayOutputStream()
+ formatter.writeTo(record, new PrintStream(out))
+ assertEquals("value\n", out.toString)
+
+ configs.put("print.key", "true")
+ formatter.configure(configs)
+ out = new ByteArrayOutputStream()
+ formatter.writeTo(record, new PrintStream(out))
+ assertEquals("key\tvalue\n", out.toString)
+
+ configs.put("print.partition", "true")
+ formatter.configure(configs)
+ out = new ByteArrayOutputStream()
+ formatter.writeTo(record, new PrintStream(out))
+ assertEquals("key\tvalue\t0\n", out.toString)
+
+ configs.put("print.timestamp", "true")
+ formatter.configure(configs)
+ out = new ByteArrayOutputStream()
+ formatter.writeTo(record, new PrintStream(out))
+ assertEquals("NO_TIMESTAMP\tkey\tvalue\t0\n", out.toString)
+
+ out = new ByteArrayOutputStream()
+ val record2 = new ConsumerRecord("topic", 0, 123, 123L, TimestampType.CREATE_TIME, 321L, -1, -1, "key".getBytes, "value".getBytes)
+ formatter.writeTo(record2, new PrintStream(out))
+ assertEquals("CreateTime:123\tkey\tvalue\t0\n", out.toString)
+ formatter.close()
+ }
+
+ @Test
+ def testNoOpMessageFormatter(): Unit = {
+ val record = new ConsumerRecord("topic", 0, 123, "key".getBytes, "value".getBytes)
+ val formatter = new NoOpMessageFormatter()
+
+ formatter.configure(new HashMap())
+ val out = new ByteArrayOutputStream()
+ formatter.writeTo(record, new PrintStream(out))
+ assertEquals("", out.toString)
+ }
+
+ @Test
+ def testChecksumMessageFormatter(): Unit = {
+ val record = new ConsumerRecord("topic", 0, 123, "key".getBytes, "value".getBytes)
+ val formatter = new ChecksumMessageFormatter()
+ val configs: JMap[String, String] = new HashMap()
+
+ formatter.configure(configs)
+ var out = new ByteArrayOutputStream()
+ formatter.writeTo(record, new PrintStream(out))
+ assertEquals("checksum:-1\n", out.toString)
+
+ configs.put("topic", "topic1")
+ formatter.configure(configs)
+ out = new ByteArrayOutputStream()
+ formatter.writeTo(record, new PrintStream(out))
+ assertEquals("topic1:checksum:-1\n", out.toString)
+ }
+
}
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index e65d400..f144165 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -190,6 +190,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
</Match>
<Match>
+ <!-- Keeping this class for compatibility. It's deprecated and will be removed in the next major release -->
+ <Source name="MessageFormatter.scala"/>
+ <Package name="kafka.common"/>
+ <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE"/>
+ </Match>
+
+ <Match>
<!-- Suppress a warning about some static initializers in Schema using instances of a
subclass. -->
<Or>