You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/10/26 20:25:54 UTC

[kafka] branch trunk updated: MINOR: MetadataShell should handle ClientQuotaRecord (#11339)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 37b3f8c  MINOR: MetadataShell should handle ClientQuotaRecord (#11339)
37b3f8c is described below

commit 37b3f8c15b2a9f7d332179a33ab18ce4753a66fd
Author: dengziming <sw...@163.com>
AuthorDate: Wed Oct 27 04:23:20 2021 +0800

    MINOR: MetadataShell should handle ClientQuotaRecord (#11339)
    
    MetadataShell should handle ClientQuotaRecord. Also, add MetadataNodeManagerTest.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../apache/kafka/shell/MetadataNodeManager.java    |  24 +-
 .../kafka/shell/MetadataNodeManagerTest.java       | 279 +++++++++++++++++++++
 3 files changed, 303 insertions(+), 2 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index c3a6474..20224a4 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -257,7 +257,7 @@
 
     <!-- Shell -->
     <suppress checks="CyclomaticComplexity"
-              files="(GlobComponent).java"/>
+              files="(GlobComponent|MetadataNodeManager).java"/>
 
     <!-- Log4J-Appender -->
     <suppress checks="CyclomaticComplexity"
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
index 4a1f71d..d4e9cd3 100644
--- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.FenceBrokerRecord;
 import org.apache.kafka.common.metadata.MetadataRecordType;
@@ -151,6 +152,11 @@ public final class MetadataNodeManager implements AutoCloseable {
         return logListener;
     }
 
+    // VisibleForTesting
+    Data getData() {
+        return data;
+    }
+
     @Override
     public void close() throws Exception {
         queue.close();
@@ -182,7 +188,8 @@ public final class MetadataNodeManager implements AutoCloseable {
         });
     }
 
-    private void handleMessage(ApiMessage message) {
+    // VisibleForTesting
+    void handleMessage(ApiMessage message) {
         try {
             MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
             handleCommitImpl(type, message);
@@ -293,6 +300,21 @@ public final class MetadataNodeManager implements AutoCloseable {
                 data.root.rmrf("topicIds", record.topicId().toString());
                 break;
             }
+            case CLIENT_QUOTA_RECORD: {
+                ClientQuotaRecord record = (ClientQuotaRecord) message;
+                DirectoryNode configsDirectory =
+                    data.root.mkdirs("configs");
+                for (ClientQuotaRecord.EntityData entityData : record.entity()) {
+                    String entityType = entityData.entityType();
+                    String entityName = entityData.entityName();
+                    DirectoryNode entityDirectory = configsDirectory.mkdirs(entityType).mkdirs(entityName);
+                    if (record.remove())
+                        entityDirectory.rmrf(record.key());
+                    else
+                        entityDirectory.create(record.key()).setContents(record.value() + "");
+                }
+                break;
+            }
             default:
                 throw new RuntimeException("Unhandled metadata record type");
         }
diff --git a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java
new file mode 100644
index 0000000..605e801
--- /dev/null
+++ b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class MetadataNodeManagerTest {
+
+    private MetadataNodeManager metadataNodeManager;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        metadataNodeManager = new MetadataNodeManager();
+        metadataNodeManager.setup();
+    }
+
+    @AfterEach
+    public void cleanup() throws Exception {
+        metadataNodeManager.close();
+    }
+
+    @Test
+    public void testRegisterBrokerRecordAndUnregisterBrokerRecord() {
+        // Register broker
+        RegisterBrokerRecord record = new RegisterBrokerRecord()
+            .setBrokerId(1)
+            .setBrokerEpoch(2);
+        metadataNodeManager.handleMessage(record);
+
+        assertEquals(record.toString(),
+            metadataNodeManager.getData().root().directory("brokers", "1").file("registration").contents());
+        assertEquals("true",
+            metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
+
+        // Unregister broker
+        UnregisterBrokerRecord unregisterBrokerRecord = new UnregisterBrokerRecord()
+            .setBrokerId(1);
+        metadataNodeManager.handleMessage(unregisterBrokerRecord);
+        assertFalse(metadataNodeManager.getData().root().directory("brokers").children().containsKey("1"));
+    }
+
+    @Test
+    public void testTopicRecordAndRemoveTopicRecord() {
+        // Add topic
+        TopicRecord topicRecord = new TopicRecord()
+            .setName("topicName")
+            .setTopicId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"));
+
+        metadataNodeManager.handleMessage(topicRecord);
+
+        assertEquals("topicName",
+            metadataNodeManager.getData().root().directory("topics", "topicName").file("name").contents());
+        assertEquals("GcaQDl2UTsCNs1p9s37XkQ",
+            metadataNodeManager.getData().root().directory("topics", "topicName").file("id").contents());
+        assertEquals("topicName",
+            metadataNodeManager.getData().root().directory("topicIds", "GcaQDl2UTsCNs1p9s37XkQ").file("name").contents());
+        assertEquals("GcaQDl2UTsCNs1p9s37XkQ",
+            metadataNodeManager.getData().root().directory("topicIds", "GcaQDl2UTsCNs1p9s37XkQ").file("id").contents());
+
+        // Remove topic
+        RemoveTopicRecord removeTopicRecord = new RemoveTopicRecord()
+            .setTopicId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"));
+
+        metadataNodeManager.handleMessage(removeTopicRecord);
+
+        assertFalse(
+            metadataNodeManager.getData().root().directory("topicIds").children().containsKey("GcaQDl2UTsCNs1p9s37XkQ"));
+        assertFalse(
+            metadataNodeManager.getData().root().directory("topics").children().containsKey("topicName"));
+    }
+
+    @Test
+    public void testPartitionRecord() {
+        PartitionRecord record = new PartitionRecord()
+            .setTopicId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"))
+            .setPartitionId(0)
+            .setLeaderEpoch(1)
+            .setReplicas(Arrays.asList(1, 2, 3))
+            .setIsr(Arrays.asList(1, 2, 3));
+
+        metadataNodeManager.handleMessage(record);
+        assertEquals(
+            PartitionRecordJsonConverter.write(record, PartitionRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString(),
+            metadataNodeManager.getData().root().directory("topicIds", "GcaQDl2UTsCNs1p9s37XkQ", "0").file("data").contents());
+    }
+
+    @Test
+    public void testValidConfigRecord() {
+        checkValidConfigRecord(ConfigResource.Type.BROKER.id(), "broker");
+        checkValidConfigRecord(ConfigResource.Type.TOPIC.id(), "topic");
+    }
+
+    private void checkValidConfigRecord(byte resourceType, String typeString) {
+        ConfigRecord configRecord = new ConfigRecord()
+            .setResourceType(resourceType)
+            .setResourceName("0")
+            .setName("name")
+            .setValue("kraft");
+
+        metadataNodeManager.handleMessage(configRecord);
+        assertEquals("kraft",
+            metadataNodeManager.getData().root().directory("configs", typeString, "0").file("name").contents());
+
+        // null value indicates delete
+        configRecord.setValue(null);
+        metadataNodeManager.handleMessage(configRecord);
+        assertFalse(
+            metadataNodeManager.getData().root().directory("configs", typeString, "0").children().containsKey("name"));
+    }
+
+    @Test
+    public void testInvalidConfigRecord() {
+        checkInvalidConfigRecord(ConfigResource.Type.BROKER_LOGGER.id());
+        checkInvalidConfigRecord(ConfigResource.Type.UNKNOWN.id());
+    }
+
+    private void checkInvalidConfigRecord(byte resourceType) {
+        ConfigRecord configRecord = new ConfigRecord()
+            .setResourceType(resourceType)
+            .setResourceName("0")
+            .setName("name")
+            .setValue("kraft");
+        metadataNodeManager.handleMessage(configRecord);
+        assertFalse(metadataNodeManager.getData().root().children().containsKey("configs"));
+    }
+
+    @Test
+    public void testPartitionChangeRecord() {
+        PartitionRecord oldPartitionRecord = new PartitionRecord()
+            .setTopicId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"))
+            .setPartitionId(0)
+            .setPartitionEpoch(0)
+            .setLeader(0)
+            .setLeaderEpoch(0)
+            .setIsr(Arrays.asList(0, 1, 2))
+            .setReplicas(Arrays.asList(0, 1, 2));
+
+        PartitionChangeRecord partitionChangeRecord = new PartitionChangeRecord()
+            .setTopicId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"))
+            .setPartitionId(0)
+            .setLeader(NO_LEADER_CHANGE)
+            .setReplicas(Arrays.asList(0, 1, 2));
+
+        PartitionRecord newPartitionRecord = new PartitionRecord()
+            .setTopicId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"))
+            .setPartitionId(0)
+            .setPartitionEpoch(1)
+            .setLeader(0)
+            .setLeaderEpoch(0)
+            .setIsr(Arrays.asList(0, 1, 2))
+            .setReplicas(Arrays.asList(0, 1, 2));
+
+        // Change nothing
+        checkPartitionChangeRecord(
+            oldPartitionRecord,
+            partitionChangeRecord,
+            newPartitionRecord
+        );
+
+        // Change isr
+        checkPartitionChangeRecord(
+            oldPartitionRecord,
+            partitionChangeRecord.duplicate().setIsr(Arrays.asList(0, 2)),
+            newPartitionRecord.duplicate().setIsr(Arrays.asList(0, 2))
+        );
+
+        // Change leader
+        checkPartitionChangeRecord(
+            oldPartitionRecord,
+            partitionChangeRecord.duplicate().setLeader(1),
+            newPartitionRecord.duplicate().setLeader(1).setLeaderEpoch(1)
+        );
+    }
+
+    private void checkPartitionChangeRecord(PartitionRecord oldPartitionRecord,
+                                           PartitionChangeRecord partitionChangeRecord,
+                                           PartitionRecord newPartitionRecord) {
+        metadataNodeManager.handleMessage(oldPartitionRecord);
+        metadataNodeManager.handleMessage(partitionChangeRecord);
+        assertEquals(
+            PartitionRecordJsonConverter.write(newPartitionRecord, PartitionRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString(),
+            metadataNodeManager.getData().root()
+                .directory("topicIds", oldPartitionRecord.topicId().toString(), oldPartitionRecord.partitionId() + "")
+                .file("data").contents()
+        );
+    }
+
+    @Test
+    public void testUnfenceBrokerRecordAndFenceBrokerRecord() {
+        RegisterBrokerRecord record = new RegisterBrokerRecord()
+            .setBrokerId(1)
+            .setBrokerEpoch(2);
+        metadataNodeManager.handleMessage(record);
+
+        assertEquals("true",
+            metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
+
+        UnfenceBrokerRecord unfenceBrokerRecord = new UnfenceBrokerRecord()
+            .setId(1)
+            .setEpoch(2);
+        metadataNodeManager.handleMessage(unfenceBrokerRecord);
+        assertEquals("false",
+            metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
+
+        FenceBrokerRecord fenceBrokerRecord = new FenceBrokerRecord()
+            .setId(1)
+            .setEpoch(2);
+        metadataNodeManager.handleMessage(fenceBrokerRecord);
+        assertEquals("true",
+            metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
+    }
+
+    @Test
+    public void testClientQuotaRecord() {
+        ClientQuotaRecord record = new ClientQuotaRecord()
+            .setEntity(Arrays.asList(
+                    new ClientQuotaRecord.EntityData()
+                        .setEntityType("user")
+                        .setEntityName("kraft"),
+                    new ClientQuotaRecord.EntityData()
+                        .setEntityType("client")
+                        .setEntityName("kstream")
+                ))
+            .setKey("producer_byte_rate")
+            .setValue(1000.0);
+
+        metadataNodeManager.handleMessage(record);
+
+        assertEquals("1000.0",
+            metadataNodeManager.getData().root().directory("configs", "user", "kraft").file("producer_byte_rate").contents());
+        assertEquals("1000.0",
+            metadataNodeManager.getData().root().directory("configs", "client", "kstream").file("producer_byte_rate").contents());
+
+        metadataNodeManager.handleMessage(record.setRemove(true));
+
+        assertFalse(
+            metadataNodeManager.getData().root().directory("configs", "user", "kraft").children().containsKey("producer_byte_rate"));
+        assertFalse(
+            metadataNodeManager.getData().root().directory("configs", "client", "kstream").children().containsKey("producer_byte_rate"));
+
+    }
+}