You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/10/26 20:25:54 UTC
[kafka] branch trunk updated: MINOR: MetadataShell should handle
ClientQuotaRecord (#11339)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 37b3f8c MINOR: MetadataShell should handle ClientQuotaRecord (#11339)
37b3f8c is described below
commit 37b3f8c15b2a9f7d332179a33ab18ce4753a66fd
Author: dengziming <sw...@163.com>
AuthorDate: Wed Oct 27 04:23:20 2021 +0800
MINOR: MetadataShell should handle ClientQuotaRecord (#11339)
MetadataShell should handle ClientQuotaRecord. Also, add MetadataNodeManagerTest.
Reviewers: Colin P. McCabe <cm...@apache.org>
---
checkstyle/suppressions.xml | 2 +-
.../apache/kafka/shell/MetadataNodeManager.java | 24 +-
.../kafka/shell/MetadataNodeManagerTest.java | 279 +++++++++++++++++++++
3 files changed, 303 insertions(+), 2 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index c3a6474..20224a4 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -257,7 +257,7 @@
<!-- Shell -->
<suppress checks="CyclomaticComplexity"
- files="(GlobComponent).java"/>
+ files="(GlobComponent|MetadataNodeManager).java"/>
<!-- Log4J-Appender -->
<suppress checks="CyclomaticComplexity"
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
index 4a1f71d..d4e9cd3 100644
--- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
@@ -151,6 +152,11 @@ public final class MetadataNodeManager implements AutoCloseable {
return logListener;
}
+ // VisibleForTesting
+ Data getData() {
+ return data;
+ }
+
@Override
public void close() throws Exception {
queue.close();
@@ -182,7 +188,8 @@ public final class MetadataNodeManager implements AutoCloseable {
});
}
- private void handleMessage(ApiMessage message) {
+ // VisibleForTesting
+ void handleMessage(ApiMessage message) {
try {
MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
handleCommitImpl(type, message);
@@ -293,6 +300,21 @@ public final class MetadataNodeManager implements AutoCloseable {
data.root.rmrf("topicIds", record.topicId().toString());
break;
}
+ case CLIENT_QUOTA_RECORD: {
+ ClientQuotaRecord record = (ClientQuotaRecord) message;
+ DirectoryNode configsDirectory =
+ data.root.mkdirs("configs");
+ for (ClientQuotaRecord.EntityData entityData : record.entity()) {
+ String entityType = entityData.entityType();
+ String entityName = entityData.entityName();
+ DirectoryNode entityDirectory = configsDirectory.mkdirs(entityType).mkdirs(entityName);
+ if (record.remove())
+ entityDirectory.rmrf(record.key());
+ else
+ entityDirectory.create(record.key()).setContents(record.value() + "");
+ }
+ break;
+ }
default:
throw new RuntimeException("Unhandled metadata record type");
}
diff --git a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java
new file mode 100644
index 0000000..605e801
--- /dev/null
+++ b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class MetadataNodeManagerTest {
+
+ private MetadataNodeManager metadataNodeManager;
+
+ @BeforeEach
+ public void setup() throws Exception {
+ metadataNodeManager = new MetadataNodeManager();
+ metadataNodeManager.setup();
+ }
+
+ @AfterEach
+ public void cleanup() throws Exception {
+ metadataNodeManager.close();
+ }
+
+ @Test
+ public void testRegisterBrokerRecordAndUnregisterBrokerRecord() {
+ // Register broker
+ RegisterBrokerRecord record = new RegisterBrokerRecord()
+ .setBrokerId(1)
+ .setBrokerEpoch(2);
+ metadataNodeManager.handleMessage(record);
+
+ assertEquals(record.toString(),
+ metadataNodeManager.getData().root().directory("brokers", "1").file("registration").contents());
+ assertEquals("true",
+ metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
+
+ // Unregister broker
+ UnregisterBrokerRecord unregisterBrokerRecord = new UnregisterBrokerRecord()
+ .setBrokerId(1);
+ metadataNodeManager.handleMessage(unregisterBrokerRecord);
+ assertFalse(metadataNodeManager.getData().root().directory("brokers").children().containsKey("1"));
+ }
+
+ @Test
+ public void testTopicRecordAndRemoveTopicRecord() {
+ // Add topic
+ TopicRecord topicRecord = new TopicRecord()
+ .setName("topicName")
+ .setTopicId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"));
+
+ metadataNodeManager.handleMessage(topicRecord);
+
+ assertEquals("topicName",
+ metadataNodeManager.getData().root().directory("topics", "topicName").file("name").contents());
+ assertEquals("GcaQDl2UTsCNs1p9s37XkQ",
+ metadataNodeManager.getData().root().directory("topics", "topicName").file("id").contents());
+ assertEquals("topicName",
+ metadataNodeManager.getData().root().directory("topicIds", "GcaQDl2UTsCNs1p9s37XkQ").file("name").contents());
+ assertEquals("GcaQDl2UTsCNs1p9s37XkQ",
+ metadataNodeManager.getData().root().directory("topicIds", "GcaQDl2UTsCNs1p9s37XkQ").file("id").contents());
+
+ // Remove topic
+ RemoveTopicRecord removeTopicRecord = new RemoveTopicRecord()
+ .setTopicId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"));
+
+ metadataNodeManager.handleMessage(removeTopicRecord);
+
+ assertFalse(
+ metadataNodeManager.getData().root().directory("topicIds").children().containsKey("GcaQDl2UTsCNs1p9s37XkQ"));
+ assertFalse(
+ metadataNodeManager.getData().root().directory("topics").children().containsKey("topicName"));
+ }
+
+ @Test
+ public void testPartitionRecord() {
+ PartitionRecord record = new PartitionRecord()
+ .setTopicId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"))
+ .setPartitionId(0)
+ .setLeaderEpoch(1)
+ .setReplicas(Arrays.asList(1, 2, 3))
+ .setIsr(Arrays.asList(1, 2, 3));
+
+ metadataNodeManager.handleMessage(record);
+ assertEquals(
+ PartitionRecordJsonConverter.write(record, PartitionRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString(),
+ metadataNodeManager.getData().root().directory("topicIds", "GcaQDl2UTsCNs1p9s37XkQ", "0").file("data").contents());
+ }
+
+ @Test
+ public void testValidConfigRecord() {
+ checkValidConfigRecord(ConfigResource.Type.BROKER.id(), "broker");
+ checkValidConfigRecord(ConfigResource.Type.TOPIC.id(), "topic");
+ }
+
+ private void checkValidConfigRecord(byte resourceType, String typeString) {
+ ConfigRecord configRecord = new ConfigRecord()
+ .setResourceType(resourceType)
+ .setResourceName("0")
+ .setName("name")
+ .setValue("kraft");
+
+ metadataNodeManager.handleMessage(configRecord);
+ assertEquals("kraft",
+ metadataNodeManager.getData().root().directory("configs", typeString, "0").file("name").contents());
+
+ // null value indicates delete
+ configRecord.setValue(null);
+ metadataNodeManager.handleMessage(configRecord);
+ assertFalse(
+ metadataNodeManager.getData().root().directory("configs", typeString, "0").children().containsKey("name"));
+ }
+
+ @Test
+ public void testInvalidConfigRecord() {
+ checkInvalidConfigRecord(ConfigResource.Type.BROKER_LOGGER.id());
+ checkInvalidConfigRecord(ConfigResource.Type.UNKNOWN.id());
+ }
+
+ private void checkInvalidConfigRecord(byte resourceType) {
+ ConfigRecord configRecord = new ConfigRecord()
+ .setResourceType(resourceType)
+ .setResourceName("0")
+ .setName("name")
+ .setValue("kraft");
+ metadataNodeManager.handleMessage(configRecord);
+ assertFalse(metadataNodeManager.getData().root().children().containsKey("configs"));
+ }
+
+ @Test
+ public void testPartitionChangeRecord() {
+ PartitionRecord oldPartitionRecord = new PartitionRecord()
+ .setTopicId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"))
+ .setPartitionId(0)
+ .setPartitionEpoch(0)
+ .setLeader(0)
+ .setLeaderEpoch(0)
+ .setIsr(Arrays.asList(0, 1, 2))
+ .setReplicas(Arrays.asList(0, 1, 2));
+
+ PartitionChangeRecord partitionChangeRecord = new PartitionChangeRecord()
+ .setTopicId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"))
+ .setPartitionId(0)
+ .setLeader(NO_LEADER_CHANGE)
+ .setReplicas(Arrays.asList(0, 1, 2));
+
+ PartitionRecord newPartitionRecord = new PartitionRecord()
+ .setTopicId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"))
+ .setPartitionId(0)
+ .setPartitionEpoch(1)
+ .setLeader(0)
+ .setLeaderEpoch(0)
+ .setIsr(Arrays.asList(0, 1, 2))
+ .setReplicas(Arrays.asList(0, 1, 2));
+
+ // Change nothing
+ checkPartitionChangeRecord(
+ oldPartitionRecord,
+ partitionChangeRecord,
+ newPartitionRecord
+ );
+
+ // Change isr
+ checkPartitionChangeRecord(
+ oldPartitionRecord,
+ partitionChangeRecord.duplicate().setIsr(Arrays.asList(0, 2)),
+ newPartitionRecord.duplicate().setIsr(Arrays.asList(0, 2))
+ );
+
+ // Change leader
+ checkPartitionChangeRecord(
+ oldPartitionRecord,
+ partitionChangeRecord.duplicate().setLeader(1),
+ newPartitionRecord.duplicate().setLeader(1).setLeaderEpoch(1)
+ );
+ }
+
+ private void checkPartitionChangeRecord(PartitionRecord oldPartitionRecord,
+ PartitionChangeRecord partitionChangeRecord,
+ PartitionRecord newPartitionRecord) {
+ metadataNodeManager.handleMessage(oldPartitionRecord);
+ metadataNodeManager.handleMessage(partitionChangeRecord);
+ assertEquals(
+ PartitionRecordJsonConverter.write(newPartitionRecord, PartitionRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString(),
+ metadataNodeManager.getData().root()
+ .directory("topicIds", oldPartitionRecord.topicId().toString(), oldPartitionRecord.partitionId() + "")
+ .file("data").contents()
+ );
+ }
+
+ @Test
+ public void testUnfenceBrokerRecordAndFenceBrokerRecord() {
+ RegisterBrokerRecord record = new RegisterBrokerRecord()
+ .setBrokerId(1)
+ .setBrokerEpoch(2);
+ metadataNodeManager.handleMessage(record);
+
+ assertEquals("true",
+ metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
+
+ UnfenceBrokerRecord unfenceBrokerRecord = new UnfenceBrokerRecord()
+ .setId(1)
+ .setEpoch(2);
+ metadataNodeManager.handleMessage(unfenceBrokerRecord);
+ assertEquals("false",
+ metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
+
+ FenceBrokerRecord fenceBrokerRecord = new FenceBrokerRecord()
+ .setId(1)
+ .setEpoch(2);
+ metadataNodeManager.handleMessage(fenceBrokerRecord);
+ assertEquals("true",
+ metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
+ }
+
+ @Test
+ public void testClientQuotaRecord() {
+ ClientQuotaRecord record = new ClientQuotaRecord()
+ .setEntity(Arrays.asList(
+ new ClientQuotaRecord.EntityData()
+ .setEntityType("user")
+ .setEntityName("kraft"),
+ new ClientQuotaRecord.EntityData()
+ .setEntityType("client")
+ .setEntityName("kstream")
+ ))
+ .setKey("producer_byte_rate")
+ .setValue(1000.0);
+
+ metadataNodeManager.handleMessage(record);
+
+ assertEquals("1000.0",
+ metadataNodeManager.getData().root().directory("configs", "user", "kraft").file("producer_byte_rate").contents());
+ assertEquals("1000.0",
+ metadataNodeManager.getData().root().directory("configs", "client", "kstream").file("producer_byte_rate").contents());
+
+ metadataNodeManager.handleMessage(record.setRemove(true));
+
+ assertFalse(
+ metadataNodeManager.getData().root().directory("configs", "user", "kraft").children().containsKey("producer_byte_rate"));
+ assertFalse(
+ metadataNodeManager.getData().root().directory("configs", "client", "kstream").children().containsKey("producer_byte_rate"));
+
+ }
+}