You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/08/25 06:09:09 UTC
[kafka] branch trunk updated: KAFKA-13850: Show missing record type in MetadataShell (#12103)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 19581effbf9 KAFKA-13850: Show missing record type in MetadataShell (#12103)
19581effbf9 is described below
commit 19581effbf9265db318f855cc37f6b0526c1b544
Author: dengziming <de...@gmail.com>
AuthorDate: Thu Aug 25 14:09:01 2022 +0800
KAFKA-13850: Show missing record type in MetadataShell (#12103)
AccessControlEntryRecord and RemoveAccessControlEntryRecord are added in KIP-801, FeatureLevelRecord was added in KIP-778, and BrokerRegistrationChangeRecord was added in KIP-841, and NoOpRecord was added in KIP-835, I added these 5 record types in MetadataShell.
Reviewers: Luke Chen <sh...@gmail.com>
---
checkstyle/suppressions.xml | 4 +
.../common/metadata/FeatureLevelRecord.json | 2 +-
.../apache/kafka/shell/MetadataNodeManager.java | 53 +++++++++++
.../kafka/shell/MetadataNodeManagerTest.java | 106 +++++++++++++++++++++
4 files changed, 164 insertions(+), 1 deletion(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index bec3da1637a..a891cf647f7 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -278,6 +278,10 @@
<!-- Shell -->
<suppress checks="CyclomaticComplexity"
files="(GlobComponent|MetadataNodeManager).java"/>
+ <suppress checks="MethodLength"
+ files="(MetadataNodeManager).java"/>
+ <suppress checks="JavaNCSS"
+ files="(MetadataNodeManager).java"/>
<!-- Log4J-Appender -->
<suppress checks="CyclomaticComplexity"
diff --git a/metadata/src/main/resources/common/metadata/FeatureLevelRecord.json b/metadata/src/main/resources/common/metadata/FeatureLevelRecord.json
index 03ff347eb82..29cdd9b467f 100644
--- a/metadata/src/main/resources/common/metadata/FeatureLevelRecord.json
+++ b/metadata/src/main/resources/common/metadata/FeatureLevelRecord.json
@@ -23,6 +23,6 @@
{ "name": "Name", "type": "string", "versions": "0+",
"about": "The feature name." },
{ "name": "FeatureLevel", "type": "int16", "versions": "0+",
- "about": "The current finalized feature level of this feature for the cluster." }
+ "about": "The current finalized feature level of this feature for the cluster, a value of 0 means feature not supported." }
]
}
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
index 9d4941f8020..cc8d30940a6 100644
--- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
@@ -21,9 +21,14 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.AccessControlEntryRecordJsonConverter;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.FeatureLevelRecordJsonConverter;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
@@ -31,6 +36,7 @@ import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
@@ -39,6 +45,8 @@ import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
@@ -302,6 +310,22 @@ public final class MetadataNodeManager implements AutoCloseable {
create("isFenced").setContents("false");
break;
}
+ case BROKER_REGISTRATION_CHANGE_RECORD: {
+ BrokerRegistrationChangeRecord record = (BrokerRegistrationChangeRecord) message;
+ BrokerRegistrationFencingChange fencingChange =
+ BrokerRegistrationFencingChange.fromValue(record.fenced()).get();
+ if (fencingChange != BrokerRegistrationFencingChange.NONE) {
+ data.root.mkdirs("brokers", Integer.toString(record.brokerId()))
+ .create("isFenced").setContents(Boolean.toString(fencingChange.asBoolean().get()));
+ }
+ BrokerRegistrationInControlledShutdownChange inControlledShutdownChange =
+ BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).get();
+ if (inControlledShutdownChange != BrokerRegistrationInControlledShutdownChange.NONE) {
+ data.root.mkdirs("brokers", Integer.toString(record.brokerId()))
+ .create("inControlledShutdown").setContents(Boolean.toString(inControlledShutdownChange.asBoolean().get()));
+ }
+ break;
+ }
case REMOVE_TOPIC_RECORD: {
RemoveTopicRecord record = (RemoveTopicRecord) message;
DirectoryNode topicsDirectory =
@@ -333,6 +357,35 @@ public final class MetadataNodeManager implements AutoCloseable {
producerIds.create("nextBlockStartId").setContents(record.nextProducerId() + "");
break;
}
+ case ACCESS_CONTROL_ENTRY_RECORD: {
+ AccessControlEntryRecord record = (AccessControlEntryRecord) message;
+ DirectoryNode acls = data.root.mkdirs("acl").mkdirs("id");
+ FileNode file = acls.create(record.id().toString());
+ file.setContents(AccessControlEntryRecordJsonConverter.write(record,
+ AccessControlEntryRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString());
+ break;
+ }
+ case REMOVE_ACCESS_CONTROL_ENTRY_RECORD: {
+ RemoveAccessControlEntryRecord record = (RemoveAccessControlEntryRecord) message;
+ DirectoryNode acls = data.root.mkdirs("acl").mkdirs("id");
+ acls.rmrf(record.id().toString());
+ break;
+ }
+ case FEATURE_LEVEL_RECORD: {
+ FeatureLevelRecord record = (FeatureLevelRecord) message;
+ DirectoryNode features = data.root.mkdirs("features");
+ if (record.featureLevel() == 0) {
+ features.rmrf(record.name());
+ } else {
+ FileNode file = features.create(record.name());
+ file.setContents(FeatureLevelRecordJsonConverter.write(record,
+ FeatureLevelRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString());
+ }
+ break;
+ }
+ case NO_OP_RECORD: {
+ break;
+ }
default:
throw new RuntimeException("Unhandled metadata record type");
}
diff --git a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java
index c580e1d5c27..59008280ac1 100644
--- a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java
+++ b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java
@@ -18,19 +18,31 @@
package org.apache.kafka.shell;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.AccessControlEntryRecord;
+import org.apache.kafka.common.metadata.AccessControlEntryRecordJsonConverter;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.FeatureLevelRecordJsonConverter;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -256,6 +268,61 @@ public class MetadataNodeManagerTest {
metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
}
+ @Test
+ public void testBrokerRegistrationChangeRecord() {
+ RegisterBrokerRecord record = new RegisterBrokerRecord()
+ .setBrokerId(1)
+ .setBrokerEpoch(2);
+ metadataNodeManager.handleMessage(record);
+ assertEquals("true",
+ metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
+
+ // Unfence broker
+ BrokerRegistrationChangeRecord record1 = new BrokerRegistrationChangeRecord()
+ .setBrokerId(1)
+ .setBrokerEpoch(2)
+ .setFenced(BrokerRegistrationFencingChange.UNFENCE.value());
+ metadataNodeManager.handleMessage(record1);
+ assertEquals("false",
+ metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
+
+ // Fence broker
+ BrokerRegistrationChangeRecord record2 = new BrokerRegistrationChangeRecord()
+ .setBrokerId(1)
+ .setBrokerEpoch(2)
+ .setFenced(BrokerRegistrationFencingChange.FENCE.value());
+ metadataNodeManager.handleMessage(record2);
+ assertEquals("true",
+ metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
+
+ // Unchanged
+ BrokerRegistrationChangeRecord record3 = new BrokerRegistrationChangeRecord()
+ .setBrokerId(1)
+ .setBrokerEpoch(2)
+ .setFenced(BrokerRegistrationFencingChange.NONE.value());
+ metadataNodeManager.handleMessage(record3);
+ assertEquals("true",
+ metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
+
+ // Controlled shutdown
+ BrokerRegistrationChangeRecord record4 = new BrokerRegistrationChangeRecord()
+ .setBrokerId(1)
+ .setBrokerEpoch(2)
+ .setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value());
+ metadataNodeManager.handleMessage(record4);
+ assertEquals("true",
+ metadataNodeManager.getData().root().directory("brokers", "1").file("inControlledShutdown").contents());
+
+ // Unchanged
+ BrokerRegistrationChangeRecord record5 = new BrokerRegistrationChangeRecord()
+ .setBrokerId(1)
+ .setBrokerEpoch(2)
+ .setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.NONE.value());
+ metadataNodeManager.handleMessage(record5);
+ assertEquals("true",
+ metadataNodeManager.getData().root().directory("brokers", "1").file("inControlledShutdown").contents());
+ }
+
@Test
public void testClientQuotaRecord() {
ClientQuotaRecord record = new ClientQuotaRecord()
@@ -336,4 +403,43 @@ public class MetadataNodeManagerTest {
11000 + "",
metadataNodeManager.getData().root().directory("producerIds").file("nextBlockStartId").contents());
}
+
+ @Test
+ public void testAccessControlEntryRecordAndRemoveAccessControlEntryRecord() {
+ AccessControlEntryRecord record1 = new AccessControlEntryRecord()
+ .setId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"))
+ .setHost("example.com")
+ .setResourceType(ResourceType.GROUP.code())
+ .setResourceName("group")
+ .setOperation(AclOperation.READ.code())
+ .setPermissionType(AclPermissionType.ALLOW.code())
+ .setPrincipal("User:kafka")
+ .setPatternType(PatternType.LITERAL.code());
+ metadataNodeManager.handleMessage(record1);
+ assertEquals(
+ AccessControlEntryRecordJsonConverter.write(record1, AccessControlEntryRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString(),
+ metadataNodeManager.getData().root().directory("acl").directory("id").file("GcaQDl2UTsCNs1p9s37XkQ").contents());
+
+ RemoveAccessControlEntryRecord record2 = new RemoveAccessControlEntryRecord()
+ .setId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"));
+ metadataNodeManager.handleMessage(record2);
+ assertFalse(metadataNodeManager.getData().root().directory("acl").directory("id").children().containsKey("GcaQDl2UTsCNs1p9s37XkQ"));
+ }
+
+ @Test
+ public void testFeatureLevelRecord() {
+ FeatureLevelRecord record1 = new FeatureLevelRecord()
+ .setName("metadata.version")
+ .setFeatureLevel((short) 3);
+ metadataNodeManager.handleMessage(record1);
+ assertEquals(
+ FeatureLevelRecordJsonConverter.write(record1, FeatureLevelRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString(),
+ metadataNodeManager.getData().root().directory("features").file("metadata.version").contents());
+
+ FeatureLevelRecord record2 = new FeatureLevelRecord()
+ .setName("metadata.version")
+ .setFeatureLevel((short) 0);
+ metadataNodeManager.handleMessage(record2);
+ assertFalse(metadataNodeManager.getData().root().directory("features").children().containsKey("metadata.version"));
+ }
}