You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/10 21:08:02 UTC

[GitHub] [kafka] cmccabe commented on a change in pull request #10094: MINOR: Add the KIP-500 metadata shell

cmccabe commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r574078171



##########
File path: shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.apache.kafka.shell.MetadataNode.FileNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Maintains the in-memory metadata for the metadata tool.
+ */
+public final class MetadataNodeManager implements AutoCloseable {
+    private static final Logger log = LoggerFactory.getLogger(MetadataNodeManager.class);
+
+    public static class Data {
+        private final DirectoryNode root = new DirectoryNode();
+        private String workingDirectory = "/";
+
+        public DirectoryNode root() {
+            return root;
+        }
+
+        public String workingDirectory() {
+            return workingDirectory;
+        }
+
+        public void setWorkingDirectory(String workingDirectory) {
+            this.workingDirectory = workingDirectory;
+        }
+    }
+
+    class LogListener implements MetaLogListener, RaftClient.Listener<ApiMessageAndVersion> {
+        @Override
+        public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
+            try {
+                // TODO: handle lastOffset
+                while (reader.hasNext()) {
+                    BatchReader.Batch<ApiMessageAndVersion> batch = reader.next();
+                    for (ApiMessageAndVersion messageAndVersion : batch.records()) {
+                        handleMessage(messageAndVersion.message());
+                    }
+                }
+            } finally {
+                reader.close();
+            }
+        }
+
+        @Override
+        public void handleCommits(long lastOffset, List<ApiMessage> messages) {
+            appendEvent("handleCommits", () -> {
+                log.error("handleCommits " + messages + " at offset " + lastOffset);
+                DirectoryNode dir = data.root.mkdirs("metadataQuorum");
+                dir.create("offset").setContents(String.valueOf(lastOffset));
+                for (ApiMessage message : messages) {
+                    handleMessage(message);
+                }
+            }, null);
+        }
+
+        @Override
+        public void handleNewLeader(MetaLogLeader leader) {
+            appendEvent("handleNewLeader", () -> {
+                log.error("handleNewLeader " + leader);
+                DirectoryNode dir = data.root.mkdirs("metadataQuorum");
+                dir.create("leader").setContents(leader.toString());
+            }, null);
+        }
+
+        @Override
+        public void handleClaim(int epoch) {
+            // This shouldn't happen because we should never be the leader.
+            log.debug("RaftClient.Listener sent handleClaim(epoch=" + epoch + ")");
+        }
+
+        @Override
+        public void handleRenounce(long epoch) {
+            // This shouldn't happen because we should never be the leader.
+            log.debug("MetaLogListener sent handleRenounce(epoch=" + epoch + ")");
+        }
+
+        @Override
+        public void handleResign() {
+            // This shouldn't happen because we should never be the leader.
+            log.debug("MetaLogListener sent handleResign()");
+        }
+
+        @Override
+        public void beginShutdown() {
+            log.debug("MetaLogListener sent beginShutdown");
+        }
+    }
+
+    private final Data data = new Data();
+    private final LogListener logListener = new LogListener();
+    private final ObjectMapper objectMapper;
+    private final KafkaEventQueue queue;
+
+    public MetadataNodeManager() {
+        this.objectMapper = new ObjectMapper();
+        this.objectMapper.registerModule(new Jdk8Module());

Review comment:
       It's not used right now, I think, but in general it' something we should have since we use jdk8.  For the most part, the functionality provided is that it correctly handles Java's `Optional`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org