You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2020/07/03 09:42:41 UTC

[kafka] branch trunk updated: KAFKA-10232: MirrorMaker2 internal topics Formatters KIP-597 (#8604)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new caa806c  KAFKA-10232: MirrorMaker2 internal topics Formatters KIP-597 (#8604)
caa806c is described below

commit caa806cd82fb9fa88510c81de53e69ac9846311d
Author: Mickael Maison <mi...@users.noreply.github.com>
AuthorDate: Fri Jul 3 10:41:45 2020 +0100

    KAFKA-10232: MirrorMaker2 internal topics Formatters KIP-597 (#8604)
    
    This PR includes 3 MessageFormatters for MirrorMaker2 internal topics:
    - HeartbeatFormatter
    - CheckpointFormatter
    - OffsetSyncFormatter
    
    This also introduces a new public interface org.apache.kafka.common.MessageFormatter that users can implement to build custom formatters.
    
    Reviewers: Konstantine Karantasis <k....@gmail.com>, Ryanne Dolan <ry...@gmail.com>, David Jacot <dj...@confluent.io>
    
    Co-authored-by: Mickael Maison <mi...@gmail.com>
    Co-authored-by: Edoardo Comar <ec...@uk.ibm.com>
---
 checkstyle/import-control.xml                      |  1 +
 .../org/apache/kafka/common/MessageFormatter.java  | 66 +++++++++++++++++++
 .../apache/kafka/connect/mirror/OffsetSync.java    | 10 +--
 .../mirror/formatters/CheckpointFormatter.java     | 31 +++++++++
 .../mirror/formatters/HeartbeatFormatter.java      | 31 +++++++++
 .../mirror/formatters/OffsetSyncFormatter.java     | 31 +++++++++
 .../main/scala/kafka/common/MessageFormatter.scala | 15 +----
 .../coordinator/group/GroupMetadataManager.scala   |  4 +-
 .../coordinator/transaction/TransactionLog.scala   |  3 +-
 .../main/scala/kafka/tools/ConsoleConsumer.scala   | 23 ++++---
 .../unit/kafka/tools/ConsoleConsumerTest.scala     | 73 +++++++++++++++++++++-
 gradle/spotbugs-exclude.xml                        |  7 +++
 12 files changed, 258 insertions(+), 37 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index b990081..f45e362 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -48,6 +48,7 @@
   <allow pkg="org.apache.kafka.common.memory" />
 
   <subpackage name="common">
+    <allow class="org.apache.kafka.clients.consumer.ConsumerRecord" exact-match="true" />
     <disallow pkg="org.apache.kafka.clients" />
     <allow pkg="org.apache.kafka.common" exact-match="true" />
     <allow pkg="org.apache.kafka.common.annotation" />
diff --git a/clients/src/main/java/org/apache/kafka/common/MessageFormatter.java b/clients/src/main/java/org/apache/kafka/common/MessageFormatter.java
new file mode 100644
index 0000000..c4a255f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/MessageFormatter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.io.Closeable;
+import java.io.PrintStream;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * This interface allows to define Formatters that can be used to parse and format records read by a
+ *  Consumer instance for display.
+ * The kafka-console-consumer has built-in support for MessageFormatter, via the --formatter flag.
+ *
+ * Kafka provides a few implementations to display records of internal topics such as __consumer_offsets,
+ * __transaction_state and the MirrorMaker2 topics.
+ *
+ */
+public interface MessageFormatter extends Configurable, Closeable {
+
+    /**
+     * Initialises the MessageFormatter
+     * @param props Properties to configure the formatter
+     * @deprecated Use {@link #configure(Map)} instead, this method is for backward compatibility with the older Formatter interface
+     */
+    @Deprecated
+    default public void init(Properties props) {}
+
+    /**
+     * Configures the MessageFormatter
+     * @param configs Map to configure the formatter
+     */
+    default public void configure(Map<String, ?> configs) {
+        Properties properties = new Properties();
+        properties.putAll(configs);
+        init(properties);
+    }
+
+    /**
+     * Parses and formats a record for display
+     * @param consumerRecord the record to format
+     * @param output the print stream used to output the record
+     */
+    public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output);
+
+    /**
+     * Closes the formatter
+     */
+    default public void close() {}
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java
index abdc64c..68e6441 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSync.java
@@ -83,17 +83,17 @@ public class OffsetSync {
         return buffer;
     }
 
-    static OffsetSync deserializeRecord(ConsumerRecord<byte[], byte[]> record) {
+    public static OffsetSync deserializeRecord(ConsumerRecord<byte[], byte[]> record) {
         Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(record.key()));
         String topic = keyStruct.getString(TOPIC_KEY);
         int partition = keyStruct.getInt(PARTITION_KEY);
-        
+
         Struct valueStruct = VALUE_SCHEMA.read(ByteBuffer.wrap(record.value()));
         long upstreamOffset = valueStruct.getLong(UPSTREAM_OFFSET_KEY);
         long downstreamOffset = valueStruct.getLong(DOWNSTREAM_OFFSET_KEY);
-        
+
         return new OffsetSync(new TopicPartition(topic, partition), upstreamOffset, downstreamOffset);
-    } 
+    }
 
     private Struct valueStruct() {
         Struct struct = new Struct(VALUE_SCHEMA);
@@ -116,5 +116,5 @@ public class OffsetSync {
     byte[] recordValue() {
         return serializeValue().array();
     }
-};
+}
 
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/CheckpointFormatter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/CheckpointFormatter.java
new file mode 100644
index 0000000..33fe695
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/CheckpointFormatter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.formatters;
+
+import java.io.PrintStream;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.connect.mirror.Checkpoint;
+
+public class CheckpointFormatter implements MessageFormatter {
+
+    @Override
+    public void writeTo(ConsumerRecord<byte[], byte[]> record, PrintStream output) {
+        output.println(Checkpoint.deserializeRecord(record));
+    }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/HeartbeatFormatter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/HeartbeatFormatter.java
new file mode 100644
index 0000000..a193dbe
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/HeartbeatFormatter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.formatters;
+
+import java.io.PrintStream;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.connect.mirror.Heartbeat;
+
+public class HeartbeatFormatter implements MessageFormatter {
+
+    @Override
+    public void writeTo(ConsumerRecord<byte[], byte[]> record, PrintStream output) {
+        output.println(Heartbeat.deserializeRecord(record));
+    }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/OffsetSyncFormatter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/OffsetSyncFormatter.java
new file mode 100644
index 0000000..dacae60
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/formatters/OffsetSyncFormatter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.formatters;
+
+import java.io.PrintStream;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.connect.mirror.OffsetSync;
+
+public class OffsetSyncFormatter implements MessageFormatter {
+
+    @Override
+    public void writeTo(ConsumerRecord<byte[], byte[]> record, PrintStream output) {
+        output.println(OffsetSync.deserializeRecord(record));
+    }
+}
diff --git a/core/src/main/scala/kafka/common/MessageFormatter.scala b/core/src/main/scala/kafka/common/MessageFormatter.scala
index 9f6c3fa..7826eb7 100644
--- a/core/src/main/scala/kafka/common/MessageFormatter.scala
+++ b/core/src/main/scala/kafka/common/MessageFormatter.scala
@@ -17,23 +17,12 @@
 
 package kafka.common
 
-import java.io.PrintStream
-import java.util.Properties
-
-import org.apache.kafka.clients.consumer.ConsumerRecord
-
 /**
   * Typical implementations of this interface convert a `ConsumerRecord` into a type that can then be passed to
   * a `PrintStream`.
   *
   * This is used by the `ConsoleConsumer`.
   */
-trait MessageFormatter {
-
-  def init(props: Properties): Unit = {}
-
-  def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit
-
-  def close(): Unit = {}
-
+@deprecated("This class is deprecated and will be replaced by org.apache.kafka.common.MessageFormatter.", "2.7.0")
+trait MessageFormatter extends org.apache.kafka.common.MessageFormatter {
 }
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 6883ceb..c77f702 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -27,7 +27,7 @@ import java.util.concurrent.locks.ReentrantLock
 
 import com.yammer.metrics.core.Gauge
 import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1, KAFKA_2_3_IV0}
-import kafka.common.{MessageFormatter, OffsetAndMetadata}
+import kafka.common.OffsetAndMetadata
 import kafka.log.AppendOrigin
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.{FetchLogEnd, ReplicaManager}
@@ -47,7 +47,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchResponse}
 import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.{KafkaException, MessageFormatter, TopicPartition}
 
 import scala.collection._
 import scala.collection.mutable.ArrayBuffer
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
index 829e9e6..7fc1f3b 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala
@@ -20,12 +20,11 @@ import java.io.PrintStream
 import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets
 
-import kafka.common.MessageFormatter
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.common.protocol.types.Type._
 import org.apache.kafka.common.protocol.types._
 import org.apache.kafka.common.record.{CompressionType, Record, RecordBatch}
-import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.{KafkaException, MessageFormatter, TopicPartition}
 
 import scala.collection.mutable
 
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 5f7b9ab..8427ae1 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -22,15 +22,14 @@ import java.nio.charset.StandardCharsets
 import java.time.Duration
 import java.util.concurrent.CountDownLatch
 import java.util.regex.Pattern
-import java.util.{Collections, Locale, Properties, Random}
+import java.util.{Collections, Locale, Map, Properties, Random}
 
 import com.typesafe.scalalogging.LazyLogging
 import joptsimple._
-import kafka.common.MessageFormatter
 import kafka.utils.Implicits._
 import kafka.utils.{Exit, _}
 import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecord, KafkaConsumer}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{MessageFormatter, TopicPartition}
 import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException, WakeupException}
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.requests.ListOffsetRequest
@@ -309,7 +308,7 @@ object ConsoleConsumer extends Logging {
       formatterArgs.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
     }
 
-    formatter.init(formatterArgs)
+    formatter.configure(formatterArgs.asScala.asJava)
 
     val topicOrFilterOpt = List(topicIdOpt, whitelistOpt).filter(options.has)
     if (topicOrFilterOpt.size != 1)
@@ -466,7 +465,9 @@ class DefaultMessageFormatter extends MessageFormatter {
   var keyDeserializer: Option[Deserializer[_]] = None
   var valueDeserializer: Option[Deserializer[_]] = None
 
-  override def init(props: Properties): Unit = {
+  override def configure(configs: Map[String, _]): Unit = {
+    val props = new java.util.Properties()
+    configs.asScala.foreach { case (key, value) => props.put(key, value.toString) }
     if (props.containsKey("print.timestamp"))
       printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true")
     if (props.containsKey("print.key"))
@@ -548,7 +549,7 @@ class DefaultMessageFormatter extends MessageFormatter {
 class LoggingMessageFormatter extends MessageFormatter with LazyLogging {
   private val defaultWriter: DefaultMessageFormatter = new DefaultMessageFormatter
 
-  override def init(props: Properties): Unit = defaultWriter.init(props)
+  override def configure(configs: Map[String, _]): Unit = defaultWriter.configure(configs)
 
   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {
     import consumerRecord._
@@ -560,7 +561,6 @@ class LoggingMessageFormatter extends MessageFormatter with LazyLogging {
 }
 
 class NoOpMessageFormatter extends MessageFormatter {
-  override def init(props: Properties): Unit = {}
 
   def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = {}
 }
@@ -568,12 +568,11 @@ class NoOpMessageFormatter extends MessageFormatter {
 class ChecksumMessageFormatter extends MessageFormatter {
   private var topicStr: String = _
 
-  override def init(props: Properties): Unit = {
-    topicStr = props.getProperty("topic")
-    if (topicStr != null)
-      topicStr = topicStr + ":"
+  override def configure(configs: Map[String, _]): Unit = {
+    topicStr = if (configs.containsKey("topic"))
+      configs.get("topic").toString + ":"
     else
-      topicStr = ""
+      ""
   }
 
   @nowarn("cat=deprecation")
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index ccbf078..e930ad5 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -17,14 +17,15 @@
 
 package kafka.tools
 
-import java.io.PrintStream
+import java.io.{ByteArrayOutputStream, PrintStream}
 import java.nio.file.Files
+import java.util.{HashMap, Map => JMap}
 
-import kafka.common.MessageFormatter
 import kafka.tools.ConsoleConsumer.ConsumerWrapper
 import kafka.utils.{Exit, TestUtils}
 import org.apache.kafka.clients.consumer.{ConsumerRecord, MockConsumer, OffsetResetStrategy}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{MessageFormatter, TopicPartition}
+import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.test.MockDeserializer
 import org.mockito.Mockito._
@@ -495,4 +496,70 @@ class ConsoleConsumerTest {
       Exit.resetExitProcedure()
     }
   }
+
+  @Test
+  def testDefaultMessageFormatter(): Unit = {
+    val record = new ConsumerRecord("topic", 0, 123, "key".getBytes, "value".getBytes)
+    val formatter = new DefaultMessageFormatter()
+    val configs: JMap[String, String] = new HashMap()
+
+    formatter.configure(configs)
+    var out = new ByteArrayOutputStream()
+    formatter.writeTo(record, new PrintStream(out))
+    assertEquals("value\n", out.toString)
+
+    configs.put("print.key", "true")
+    formatter.configure(configs)
+    out = new ByteArrayOutputStream()
+    formatter.writeTo(record, new PrintStream(out))
+    assertEquals("key\tvalue\n", out.toString)
+
+    configs.put("print.partition", "true")
+    formatter.configure(configs)
+    out = new ByteArrayOutputStream()
+    formatter.writeTo(record, new PrintStream(out))
+    assertEquals("key\tvalue\t0\n", out.toString)
+
+    configs.put("print.timestamp", "true")
+    formatter.configure(configs)
+    out = new ByteArrayOutputStream()
+    formatter.writeTo(record, new PrintStream(out))
+    assertEquals("NO_TIMESTAMP\tkey\tvalue\t0\n", out.toString)
+
+    out = new ByteArrayOutputStream()
+    val record2 = new ConsumerRecord("topic", 0, 123, 123L, TimestampType.CREATE_TIME, 321L, -1, -1, "key".getBytes, "value".getBytes)
+    formatter.writeTo(record2, new PrintStream(out))
+    assertEquals("CreateTime:123\tkey\tvalue\t0\n", out.toString)
+    formatter.close()
+  }
+
+  @Test
+  def testNoOpMessageFormatter(): Unit = {
+    val record = new ConsumerRecord("topic", 0, 123, "key".getBytes, "value".getBytes)
+    val formatter = new NoOpMessageFormatter()
+
+    formatter.configure(new HashMap())
+    val out = new ByteArrayOutputStream()
+    formatter.writeTo(record, new PrintStream(out))
+    assertEquals("", out.toString)
+  }
+
+  @Test
+  def testChecksumMessageFormatter(): Unit = {
+    val record = new ConsumerRecord("topic", 0, 123, "key".getBytes, "value".getBytes)
+    val formatter = new ChecksumMessageFormatter()
+    val configs: JMap[String, String] = new HashMap()
+
+    formatter.configure(configs)
+    var out = new ByteArrayOutputStream()
+    formatter.writeTo(record, new PrintStream(out))
+    assertEquals("checksum:-1\n", out.toString)
+
+    configs.put("topic", "topic1")
+    formatter.configure(configs)
+    out = new ByteArrayOutputStream()
+    formatter.writeTo(record, new PrintStream(out))
+    assertEquals("topic1:checksum:-1\n", out.toString)
+  }
+
 }
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index e65d400..f144165 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -190,6 +190,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
     </Match>
 
     <Match>
+        <!-- Keeping this class for compatibility. It's deprecated and will be removed in the next major release -->
+        <Source name="MessageFormatter.scala"/>
+        <Package name="kafka.common"/>
+        <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE"/>
+    </Match>
+
+    <Match>
         <!-- Suppress a warning about some static initializers in Schema using instances of a
              subclass. -->
         <Or>