You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/08/25 06:09:09 UTC

[kafka] branch trunk updated: KAFKA-13850: Show missing record type in MetadataShell (#12103)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 19581effbf9 KAFKA-13850: Show missing record type in MetadataShell (#12103)
19581effbf9 is described below

commit 19581effbf9265db318f855cc37f6b0526c1b544
Author: dengziming <de...@gmail.com>
AuthorDate: Thu Aug 25 14:09:01 2022 +0800

    KAFKA-13850: Show missing record type in MetadataShell (#12103)
    
    AccessControlEntryRecord and RemoveAccessControlEntryRecord are added in KIP-801, FeatureLevelRecord was added in KIP-778, and BrokerRegistrationChangeRecord was added in KIP-841, and NoOpRecord was added in KIP-835, I added these 5 record types in MetadataShell.
    
     Reviewers: Luke Chen <sh...@gmail.com>
---
 checkstyle/suppressions.xml                        |   4 +
 .../common/metadata/FeatureLevelRecord.json        |   2 +-
 .../apache/kafka/shell/MetadataNodeManager.java    |  53 +++++++++++
 .../kafka/shell/MetadataNodeManagerTest.java       | 106 +++++++++++++++++++++
 4 files changed, 164 insertions(+), 1 deletion(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index bec3da1637a..a891cf647f7 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -278,6 +278,10 @@
     <!-- Shell -->
     <suppress checks="CyclomaticComplexity"
               files="(GlobComponent|MetadataNodeManager).java"/>
+    <suppress checks="MethodLength"
+              files="(MetadataNodeManager).java"/>
+    <suppress checks="JavaNCSS"
+              files="(MetadataNodeManager).java"/>
 
     <!-- Log4J-Appender -->
     <suppress checks="CyclomaticComplexity"
diff --git a/metadata/src/main/resources/common/metadata/FeatureLevelRecord.json b/metadata/src/main/resources/common/metadata/FeatureLevelRecord.json
index 03ff347eb82..29cdd9b467f 100644
--- a/metadata/src/main/resources/common/metadata/FeatureLevelRecord.json
+++ b/metadata/src/main/resources/common/metadata/FeatureLevelRecord.json
@@ -23,6 +23,6 @@
     { "name": "Name", "type": "string", "versions": "0+",
       "about": "The feature name." },
     { "name": "FeatureLevel", "type": "int16", "versions": "0+",
-      "about": "The current finalized feature level of this feature for the cluster." }
+      "about": "The current finalized feature level of this feature for the cluster, a value of 0 means feature not supported." }
   ]
 }
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
index 9d4941f8020..cc8d30940a6 100644
--- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
@@ -21,9 +21,14 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.AccessControlEntryRecordJsonConverter;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
 import org.apache.kafka.common.metadata.ClientQuotaRecord;
 import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
 import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.FeatureLevelRecordJsonConverter;
 import org.apache.kafka.common.metadata.FenceBrokerRecord;
 import org.apache.kafka.common.metadata.MetadataRecordType;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
@@ -31,6 +36,7 @@ import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
 import org.apache.kafka.common.metadata.ProducerIdsRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
 import org.apache.kafka.common.metadata.RemoveTopicRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
@@ -39,6 +45,8 @@ import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
 import org.apache.kafka.queue.EventQueue;
 import org.apache.kafka.queue.KafkaEventQueue;
 import org.apache.kafka.raft.Batch;
@@ -302,6 +310,22 @@ public final class MetadataNodeManager implements AutoCloseable {
                     create("isFenced").setContents("false");
                 break;
             }
+            case BROKER_REGISTRATION_CHANGE_RECORD: {
+                BrokerRegistrationChangeRecord record = (BrokerRegistrationChangeRecord) message;
+                BrokerRegistrationFencingChange fencingChange =
+                    BrokerRegistrationFencingChange.fromValue(record.fenced()).get();
+                if (fencingChange != BrokerRegistrationFencingChange.NONE) {
+                    data.root.mkdirs("brokers", Integer.toString(record.brokerId()))
+                        .create("isFenced").setContents(Boolean.toString(fencingChange.asBoolean().get()));
+                }
+                BrokerRegistrationInControlledShutdownChange inControlledShutdownChange =
+                    BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).get();
+                if (inControlledShutdownChange != BrokerRegistrationInControlledShutdownChange.NONE) {
+                    data.root.mkdirs("brokers", Integer.toString(record.brokerId()))
+                        .create("inControlledShutdown").setContents(Boolean.toString(inControlledShutdownChange.asBoolean().get()));
+                }
+                break;
+            }
             case REMOVE_TOPIC_RECORD: {
                 RemoveTopicRecord record = (RemoveTopicRecord) message;
                 DirectoryNode topicsDirectory =
@@ -333,6 +357,35 @@ public final class MetadataNodeManager implements AutoCloseable {
                 producerIds.create("nextBlockStartId").setContents(record.nextProducerId() + "");
                 break;
             }
+            case ACCESS_CONTROL_ENTRY_RECORD: {
+                AccessControlEntryRecord record = (AccessControlEntryRecord) message;
+                DirectoryNode acls = data.root.mkdirs("acl").mkdirs("id");
+                FileNode file = acls.create(record.id().toString());
+                file.setContents(AccessControlEntryRecordJsonConverter.write(record,
+                    AccessControlEntryRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString());
+                break;
+            }
+            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD: {
+                RemoveAccessControlEntryRecord record = (RemoveAccessControlEntryRecord) message;
+                DirectoryNode acls = data.root.mkdirs("acl").mkdirs("id");
+                acls.rmrf(record.id().toString());
+                break;
+            }
+            case FEATURE_LEVEL_RECORD: {
+                FeatureLevelRecord record = (FeatureLevelRecord) message;
+                DirectoryNode features = data.root.mkdirs("features");
+                if (record.featureLevel() == 0) {
+                    features.rmrf(record.name());
+                } else {
+                    FileNode file = features.create(record.name());
+                    file.setContents(FeatureLevelRecordJsonConverter.write(record,
+                        FeatureLevelRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString());
+                }
+                break;
+            }
+            case NO_OP_RECORD: {
+                break;
+            }
             default:
                 throw new RuntimeException("Unhandled metadata record type");
         }
diff --git a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java
index c580e1d5c27..59008280ac1 100644
--- a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java
+++ b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java
@@ -18,19 +18,31 @@
 package org.apache.kafka.shell;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.AccessControlEntryRecordJsonConverter;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
 import org.apache.kafka.common.metadata.ClientQuotaRecord;
 import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.FeatureLevelRecordJsonConverter;
 import org.apache.kafka.common.metadata.FenceBrokerRecord;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
 import org.apache.kafka.common.metadata.ProducerIdsRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
 import org.apache.kafka.common.metadata.RemoveTopicRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
 import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
 import org.apache.kafka.metadata.LeaderRecoveryState;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -256,6 +268,61 @@ public class MetadataNodeManagerTest {
             metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
     }
 
+    @Test
+    public void testBrokerRegistrationChangeRecord() {
+        RegisterBrokerRecord record = new RegisterBrokerRecord()
+            .setBrokerId(1)
+            .setBrokerEpoch(2);
+        metadataNodeManager.handleMessage(record);
+        assertEquals("true",
+            metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
+
+        // Unfence broker
+        BrokerRegistrationChangeRecord record1 = new BrokerRegistrationChangeRecord()
+            .setBrokerId(1)
+            .setBrokerEpoch(2)
+            .setFenced(BrokerRegistrationFencingChange.UNFENCE.value());
+        metadataNodeManager.handleMessage(record1);
+        assertEquals("false",
+            metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
+
+        // Fence broker
+        BrokerRegistrationChangeRecord record2 = new BrokerRegistrationChangeRecord()
+            .setBrokerId(1)
+            .setBrokerEpoch(2)
+            .setFenced(BrokerRegistrationFencingChange.FENCE.value());
+        metadataNodeManager.handleMessage(record2);
+        assertEquals("true",
+            metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
+
+        // Unchanged
+        BrokerRegistrationChangeRecord record3 = new BrokerRegistrationChangeRecord()
+            .setBrokerId(1)
+            .setBrokerEpoch(2)
+            .setFenced(BrokerRegistrationFencingChange.NONE.value());
+        metadataNodeManager.handleMessage(record3);
+        assertEquals("true",
+            metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
+
+        // Controlled shutdown
+        BrokerRegistrationChangeRecord record4 = new BrokerRegistrationChangeRecord()
+            .setBrokerId(1)
+            .setBrokerEpoch(2)
+            .setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value());
+        metadataNodeManager.handleMessage(record4);
+        assertEquals("true",
+            metadataNodeManager.getData().root().directory("brokers", "1").file("inControlledShutdown").contents());
+
+        // Unchanged
+        BrokerRegistrationChangeRecord record5 = new BrokerRegistrationChangeRecord()
+            .setBrokerId(1)
+            .setBrokerEpoch(2)
+            .setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.NONE.value());
+        metadataNodeManager.handleMessage(record5);
+        assertEquals("true",
+            metadataNodeManager.getData().root().directory("brokers", "1").file("inControlledShutdown").contents());
+    }
+
     @Test
     public void testClientQuotaRecord() {
         ClientQuotaRecord record = new ClientQuotaRecord()
@@ -336,4 +403,43 @@ public class MetadataNodeManagerTest {
             11000 + "",
             metadataNodeManager.getData().root().directory("producerIds").file("nextBlockStartId").contents());
     }
+
+    @Test
+    public void testAccessControlEntryRecordAndRemoveAccessControlEntryRecord() {
+        AccessControlEntryRecord record1 = new AccessControlEntryRecord()
+            .setId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"))
+            .setHost("example.com")
+            .setResourceType(ResourceType.GROUP.code())
+            .setResourceName("group")
+            .setOperation(AclOperation.READ.code())
+            .setPermissionType(AclPermissionType.ALLOW.code())
+            .setPrincipal("User:kafka")
+            .setPatternType(PatternType.LITERAL.code());
+        metadataNodeManager.handleMessage(record1);
+        assertEquals(
+            AccessControlEntryRecordJsonConverter.write(record1, AccessControlEntryRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString(),
+            metadataNodeManager.getData().root().directory("acl").directory("id").file("GcaQDl2UTsCNs1p9s37XkQ").contents());
+
+        RemoveAccessControlEntryRecord record2 = new RemoveAccessControlEntryRecord()
+            .setId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"));
+        metadataNodeManager.handleMessage(record2);
+        assertFalse(metadataNodeManager.getData().root().directory("acl").directory("id").children().containsKey("GcaQDl2UTsCNs1p9s37XkQ"));
+    }
+
+    @Test
+    public void testFeatureLevelRecord() {
+        FeatureLevelRecord record1 = new FeatureLevelRecord()
+            .setName("metadata.version")
+            .setFeatureLevel((short) 3);
+        metadataNodeManager.handleMessage(record1);
+        assertEquals(
+            FeatureLevelRecordJsonConverter.write(record1, FeatureLevelRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString(),
+            metadataNodeManager.getData().root().directory("features").file("metadata.version").contents());
+
+        FeatureLevelRecord record2 = new FeatureLevelRecord()
+            .setName("metadata.version")
+            .setFeatureLevel((short) 0);
+        metadataNodeManager.handleMessage(record2);
+        assertFalse(metadataNodeManager.getData().root().directory("features").children().containsKey("metadata.version"));
+    }
 }