You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/09 23:27:38 UTC

[GitHub] [kafka] cmccabe opened a new pull request #10094: MINOR: Add the KIP-500 metadata shell

cmccabe opened a new pull request #10094:
URL: https://github.com/apache/kafka/pull/10094


   The Kafka Metadata shell is a new command which allows users to
   interactively examine the metadata stored in a KIP-500 cluster.
   
   It can read the metadata from the controllers directly, by connecting to
   them, or from a metadata snapshot on disk.  In the former case, the
   quorum voters must be specified by passing the --controllers flag; in
   the latter case, the snapshot file should be specified via --snapshot.
   
   The metadata tool works by replaying the log and storing the state into
   in-memory nodes.  These nodes are presented in a fashion similar to
   filesystem directories.
   
   This PR currently includes the metalog/ directory since that is a
   dependency of metadata shell.  Eventually we want to migrate to using
   the Raft API directly, however.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10094: MINOR: Add the KIP-500 metadata shell

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r574122281



##########
File path: shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.apache.kafka.shell.MetadataNode.FileNode;
+import org.jline.reader.Candidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Implements the cat command.
+ */
+public final class CatCommandHandler implements Commands.Handler {
+    private static final Logger log = LoggerFactory.getLogger(CatCommandHandler.class);
+
+    public final static Commands.Type TYPE = new CatCommandType();
+
+    public static class CatCommandType implements Commands.Type {
+        private CatCommandType() {
+        }
+
+        @Override
+        public String name() {
+            return "cat";
+        }
+
+        @Override
+        public String description() {
+            return "Show the contents of metadata nodes.";
+        }
+
+        @Override
+        public boolean shellOnly() {
+            return false;
+        }
+
+        @Override
+        public void addArguments(ArgumentParser parser) {
+            parser.addArgument("targets").
+                nargs("+").
+                help("The metadata nodes to display.");
+        }
+
+        @Override
+        public Commands.Handler createHandler(Namespace namespace) {
+            return new CatCommandHandler(namespace.getList("targets"));
+        }
+
+        @Override
+        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
+                                 List<Candidate> candidates) throws Exception {
+            CommandUtils.completePath(nodeManager, nextWords.get(nextWords.size() - 1),
+                candidates);
+        }
+    }
+
+    private final List<String> targets;
+
+    public CatCommandHandler(List<String> targets) {
+        this.targets = targets;

Review comment:
       I think it shouldn't be needed here.  We only instantiate this in one place, and we expect the object to take ownership of what it's passed in...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10094: MINOR: Add the KIP-500 metadata shell

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r574078171



##########
File path: shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.apache.kafka.shell.MetadataNode.FileNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Maintains the in-memory metadata for the metadata tool.
+ */
+public final class MetadataNodeManager implements AutoCloseable {
+    private static final Logger log = LoggerFactory.getLogger(MetadataNodeManager.class);
+
+    public static class Data {
+        private final DirectoryNode root = new DirectoryNode();
+        private String workingDirectory = "/";
+
+        public DirectoryNode root() {
+            return root;
+        }
+
+        public String workingDirectory() {
+            return workingDirectory;
+        }
+
+        public void setWorkingDirectory(String workingDirectory) {
+            this.workingDirectory = workingDirectory;
+        }
+    }
+
+    class LogListener implements MetaLogListener, RaftClient.Listener<ApiMessageAndVersion> {
+        @Override
+        public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
+            try {
+                // TODO: handle lastOffset
+                while (reader.hasNext()) {
+                    BatchReader.Batch<ApiMessageAndVersion> batch = reader.next();
+                    for (ApiMessageAndVersion messageAndVersion : batch.records()) {
+                        handleMessage(messageAndVersion.message());
+                    }
+                }
+            } finally {
+                reader.close();
+            }
+        }
+
+        @Override
+        public void handleCommits(long lastOffset, List<ApiMessage> messages) {
+            appendEvent("handleCommits", () -> {
+                log.error("handleCommits " + messages + " at offset " + lastOffset);
+                DirectoryNode dir = data.root.mkdirs("metadataQuorum");
+                dir.create("offset").setContents(String.valueOf(lastOffset));
+                for (ApiMessage message : messages) {
+                    handleMessage(message);
+                }
+            }, null);
+        }
+
+        @Override
+        public void handleNewLeader(MetaLogLeader leader) {
+            appendEvent("handleNewLeader", () -> {
+                log.error("handleNewLeader " + leader);
+                DirectoryNode dir = data.root.mkdirs("metadataQuorum");
+                dir.create("leader").setContents(leader.toString());
+            }, null);
+        }
+
+        @Override
+        public void handleClaim(int epoch) {
+            // This shouldn't happen because we should never be the leader.
+            log.debug("RaftClient.Listener sent handleClaim(epoch=" + epoch + ")");
+        }
+
+        @Override
+        public void handleRenounce(long epoch) {
+            // This shouldn't happen because we should never be the leader.
+            log.debug("MetaLogListener sent handleRenounce(epoch=" + epoch + ")");
+        }
+
+        @Override
+        public void handleResign() {
+            // This shouldn't happen because we should never be the leader.
+            log.debug("MetaLogListener sent handleResign()");
+        }
+
+        @Override
+        public void beginShutdown() {
+            log.debug("MetaLogListener sent beginShutdown");
+        }
+    }
+
+    private final Data data = new Data();
+    private final LogListener logListener = new LogListener();
+    private final ObjectMapper objectMapper;
+    private final KafkaEventQueue queue;
+
+    public MetadataNodeManager() {
+        this.objectMapper = new ObjectMapper();
+        this.objectMapper.registerModule(new Jdk8Module());

Review comment:
       It's not used right now, I think, but in general it' something we should have since we use jdk8.  For the most part, the functionality provided is that it correctly handles Java's `Optional`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#issuecomment-782461539


   ran test manually and committed from command line


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10094: MINOR: Add the KIP-500 metadata shell

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r574088810



##########
File path: metadata/src/main/resources/common/metadata/IsrChangeRecord.json
##########
@@ -28,6 +28,8 @@
     { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
       "about": "The lead replica, or -1 if there is no leader." },
     { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
-      "about": "An epoch that gets incremented each time we change the ISR." }
+      "about": "An epoch that gets incremented each time we change the partition leader." },
+    { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1",

Review comment:
       This change is in the controller PR as well




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r579433300



##########
File path: settings.gradle
##########
@@ -29,6 +29,7 @@ include 'clients',
     'log4j-appender',
     'metadata',
     'raft',
+    'shell',

Review comment:
       It would be nice to have it as just "shell" until we get another shell....




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r578818603



##########
File path: bin/kafka-metadata-shell.sh
##########
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.shell.MetadataShell "$@"

Review comment:
       Or maybe an alternative is to put this under bin/metadata




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r579432108



##########
File path: metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+/**
+ * The LocalLogManager is a test implementation that relies on the contents of memory.
+ */
+public final class LocalLogManager implements MetaLogManager, AutoCloseable {

Review comment:
       Yes, this should not be present twice.  Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r579431376



##########
File path: core/src/main/scala/kafka/server/Server.scala
##########
@@ -29,6 +29,7 @@ trait Server {
 }
 
 object Server {
+  val metadataTopicName = "@metadata"

Review comment:
       yes, let's do that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10094: MINOR: Add the KIP-500 metadata shell

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r574088270



##########
File path: core/src/main/scala/kafka/server/Server.scala
##########
@@ -29,6 +29,7 @@ trait Server {
 }
 
 object Server {
+  val metadataTopicName = "@metadata"

Review comment:
       RaftClient requires us to pass this as an argument, for now at least




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#issuecomment-780880646


   I rebased this on trunk and created a JIRA for it


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10094: MINOR: Add the KIP-500 metadata shell

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r574082124



##########
File path: shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig$;
+import kafka.server.KafkaConfig;
+import kafka.server.MetaProperties;
+import kafka.server.Server;
+import kafka.tools.TerseFailure;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.compat.java8.OptionConverters;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+
+/**
+ * The Kafka metadata tool.
+ */
+public final class MetadataShell {
+    private static final Logger log = LoggerFactory.getLogger(MetadataShell.class);
+
+    public static class Builder {
+        private String controllers;
+        private String configPath;
+        private File tempDir;
+        private String snapshotPath;
+
+        public Builder setControllers(String controllers) {
+            this.controllers = controllers;
+            return this;
+        }
+
+        public Builder setConfigPath(String configPath) {
+            this.configPath = configPath;
+            return this;
+        }
+
+        public Builder setSnapshotPath(String snapshotPath) {
+            this.snapshotPath = snapshotPath;
+            return this;
+        }
+
+        public Builder setTempDir(File tempDir) {
+            this.tempDir = tempDir;
+            return this;
+        }
+
+        public MetadataShell build() throws Exception {
+            if (snapshotPath != null) {
+                if (controllers != null) {
+                    throw new RuntimeException("If you specify a snapshot path, you " +

Review comment:
       Yes, we eventually want snapshots in connected mode.  However, it's up to the RaftClient to handle snapshots (or will be) so there won't be much to do in metadata shell.  We wouldn't use the SnapshotReader class for that since that class is actually oriented to reading snapshots from disk directly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10094: MINOR: Add the KIP-500 metadata shell

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r574078790



##########
File path: shell/src/main/java/org/apache/kafka/shell/SnapshotReader.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * The Kafka metadata tool.

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10094: MINOR: Add the KIP-500 metadata shell

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r574115310



##########
File path: metadata/src/main/resources/common/metadata/PartitionRecord.json
##########
@@ -34,6 +34,8 @@
     { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
       "about": "The lead replica, or -1 if there is no leader." },
     { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
-      "about": "An epoch that gets incremented each time we change the ISR." }
+      "about": "An epoch that gets incremented each time we change the partition leader." },

Review comment:
       This change is in the controller PR as well
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] soarez commented on a change in pull request #10094: MINOR: Add the KIP-500 metadata shell

Posted by GitBox <gi...@apache.org>.
soarez commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r574130615



##########
File path: shell/src/main/java/org/apache/kafka/shell/InteractiveShell.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import org.jline.reader.Candidate;
+import org.jline.reader.Completer;
+import org.jline.reader.EndOfFileException;
+import org.jline.reader.History;
+import org.jline.reader.LineReader;
+import org.jline.reader.LineReaderBuilder;
+import org.jline.reader.ParsedLine;
+import org.jline.reader.Parser;
+import org.jline.reader.UserInterruptException;
+import org.jline.reader.impl.DefaultParser;
+import org.jline.reader.impl.history.DefaultHistory;
+import org.jline.terminal.Terminal;
+import org.jline.terminal.TerminalBuilder;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+
+/**
+ * The Kafka metadata shell.
+ */
+public final class InteractiveShell implements AutoCloseable {
+    static class MetadataShellCompleter implements Completer {
+        private final MetadataNodeManager nodeManager;
+
+        MetadataShellCompleter(MetadataNodeManager nodeManager) {
+            this.nodeManager = nodeManager;
+        }
+
+        @Override
+        public void complete(LineReader reader, ParsedLine line, List<Candidate> candidates) {
+            if (line.words().size() == 0) {
+                CommandUtils.completeCommand("", candidates);
+            } else if (line.words().size() == 1) {
+                CommandUtils.completeCommand(line.words().get(0), candidates);
+            } else {
+                Iterator<String> iter = line.words().iterator();
+                String command = iter.next();
+                List<String> nextWords = new ArrayList<>();
+                while (iter.hasNext()) {
+                    nextWords.add(iter.next());
+                }
+                Commands.Type type = Commands.TYPES.get(command);
+                if (type == null) {
+                    return;
+                }
+                try {
+                    type.completeNext(nodeManager, nextWords, candidates);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    private final MetadataNodeManager nodeManager;
+    private final Terminal terminal;
+    private final Parser parser;
+    private final History history;
+    private final MetadataShellCompleter completer;
+    private final LineReader reader;
+
+    public InteractiveShell(MetadataNodeManager nodeManager) throws IOException {
+        this.nodeManager = nodeManager;
+        TerminalBuilder builder = TerminalBuilder.builder().
+            system(true).
+            nativeSignals(true);
+        this.terminal = builder.build();
+        this.parser = new DefaultParser();
+        this.history = new DefaultHistory();
+        this.completer = new MetadataShellCompleter(nodeManager);
+        this.reader = LineReaderBuilder.builder().
+            terminal(terminal).
+            parser(parser).
+            history(history).
+            completer(completer).
+            option(LineReader.Option.AUTO_FRESH_LINE, false).
+            build();
+    }
+
+    public void runMainLoop() throws Exception {
+        terminal.writer().println("[ Kafka Metadata Shell ]");
+        terminal.flush();
+        Commands commands = new Commands(true);
+        while (true) {
+            try {
+                reader.readLine(">> ");
+                ParsedLine parsedLine = reader.getParsedLine();
+                Commands.Handler handler = commands.parseCommand(parsedLine.words());
+                handler.run(Optional.of(this), terminal.writer(), nodeManager);
+                terminal.writer().flush();
+            } catch (UserInterruptException eof) {
+                // Handle ths user pressing Control-C.

Review comment:
       typo

##########
File path: metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.metalog.MockMetaLogManagerListener.COMMIT;
+import static org.apache.kafka.metalog.MockMetaLogManagerListener.LAST_COMMITTED_OFFSET;
+import static org.apache.kafka.metalog.MockMetaLogManagerListener.SHUTDOWN;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+@Timeout(value = 40)
+public class LocalLogManagerTest {
+    private static final Logger log = LoggerFactory.getLogger(LocalLogManagerTest.class);
+
+    /**
+     * Test creating a LocalLogManager and closing it.
+     */
+    @Test
+    public void testCreateAndClose() throws Exception {
+        try (LocalLogManagerTestEnv env =
+                 LocalLogManagerTestEnv.createWithMockListeners(1)) {
+            env.close();
+            assertEquals(null, env.firstError.get());
+        }
+    }
+
+    /**
+     * Test that the local log maanger will claim leadership.

Review comment:
       Typo

##########
File path: shell/src/main/java/org/apache/kafka/shell/CommandUtils.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.jline.reader.Candidate;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+/**
+ * Utility functions for command handlers.
+ */
+public final class CommandUtils {
+    /**
+     * Convert a list of paths into the effective list of paths which should be used.
+     * Empty strings will be removed.  If no paths are given, the current working
+     * directory will be used.
+     *
+     * @param paths     The input paths.  Non-null.
+     *
+     * @return          The output paths.
+     */
+    public static List<String> getEffectivePaths(List<String> paths) {
+        List<String> effectivePaths = new ArrayList<>();
+        for (String path : paths) {
+            if (!path.isEmpty()) {
+                effectivePaths.add(path);
+            }
+        }
+        if (effectivePaths.isEmpty()) {
+            effectivePaths.add(".");
+        }
+        return effectivePaths;
+    }
+
+    /**
+     * Generate a list of potential completions for a prefix of a command name.
+     *
+     * @param commandPrefix     The command prefix.  Non-null.
+     * @param candidates        The list to add the output completions to.
+     */
+    public static void completeCommand(String commandPrefix, List<Candidate> candidates) {
+        String command = Commands.TYPES.ceilingKey(commandPrefix);
+        while (true) {
+            if (command == null || !command.startsWith(commandPrefix)) {
+                return;
+            }

Review comment:
       ```suggestion
           while (command != null && command.startsWith(commandPrefix)) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r579516434



##########
File path: settings.gradle
##########
@@ -29,6 +29,7 @@ include 'clients',
     'log4j-appender',
     'metadata',
     'raft',
+    'shell',

Review comment:
       Ok. I was mainly objecting to the generality of the name since this it is focused only on the metadata for KIP-500. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10094: MINOR: Add the KIP-500 metadata shell

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r574087864



##########
File path: build.gradle
##########
@@ -1351,6 +1351,50 @@ project(':tools') {
   }
 }
 
+project(':shell') {

Review comment:
       Good point.  Let's call the command "kafka-metadata-shell.sh"
   
   For the internal names, I think "shell" is fine (and simple) until we have a second shell, if we ever do.  We can always rename it then since it's all internal classes, no API.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r578813351



##########
File path: metadata/src/main/java/org/apache/kafka/metalog/LocalLogManager.java
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+/**
+ * The LocalLogManager is a test implementation that relies on the contents of memory.
+ */
+public final class LocalLogManager implements MetaLogManager, AutoCloseable {

Review comment:
       We have this class already checked in under `metadata/src/test/java`. If it needs to be here, can we just move it?

##########
File path: settings.gradle
##########
@@ -29,6 +29,7 @@ include 'clients',
     'log4j-appender',
     'metadata',
     'raft',
+    'shell',

Review comment:
       I liked @mumrah's suggestion to call this module `metashell`.

##########
File path: core/src/main/scala/kafka/server/Server.scala
##########
@@ -29,6 +29,7 @@ trait Server {
 }
 
 object Server {
+  val metadataTopicName = "@metadata"

Review comment:
       Can we use `KafkaRaftServer.MetadataTopic` and remove this?
   

##########
File path: bin/kafka-metadata-shell.sh
##########
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.shell.MetadataShell "$@"

Review comment:
       The shell basically becomes a public api with this. I thought I recalled that we were going to do a separate KIP? An alternative would be to locate this under `shell/bin`. Or maybe we can print a message when the tool starts out which emphasizes that this is an experimental tool without any compatibility guarantees.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r578823423



##########
File path: bin/kafka-metadata-shell.sh
##########
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.shell.MetadataShell "$@"

Review comment:
       Actually, looks like this is documented in the KIP..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe closed pull request #10094: KAFKA-12334: Add the KIP-631 metadata shell

Posted by GitBox <gi...@apache.org>.
cmccabe closed pull request #10094:
URL: https://github.com/apache/kafka/pull/10094


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10094: MINOR: Add the KIP-500 metadata shell

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r574048761



##########
File path: shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig$;
+import kafka.server.KafkaConfig;
+import kafka.server.MetaProperties;
+import kafka.server.Server;
+import kafka.tools.TerseFailure;
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.compat.java8.OptionConverters;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+
+/**
+ * The Kafka metadata tool.
+ */
+public final class MetadataShell {
+    private static final Logger log = LoggerFactory.getLogger(MetadataShell.class);
+
+    public static class Builder {
+        private String controllers;
+        private String configPath;
+        private File tempDir;
+        private String snapshotPath;
+
+        public Builder setControllers(String controllers) {
+            this.controllers = controllers;
+            return this;
+        }
+
+        public Builder setConfigPath(String configPath) {
+            this.configPath = configPath;
+            return this;
+        }
+
+        public Builder setSnapshotPath(String snapshotPath) {
+            this.snapshotPath = snapshotPath;
+            return this;
+        }
+
+        public Builder setTempDir(File tempDir) {
+            this.tempDir = tempDir;
+            return this;
+        }
+
+        public MetadataShell build() throws Exception {
+            if (snapshotPath != null) {
+                if (controllers != null) {
+                    throw new RuntimeException("If you specify a snapshot path, you " +

Review comment:
       Do we eventually want to support snapshots + connected mode? Seems like in cases where we have a lot of records in the metadata log, this tool will take a while to initialize. 

##########
File path: shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.IsrChangeRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.apache.kafka.shell.MetadataNode.FileNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Maintains the in-memory metadata for the metadata tool.
+ */
+public final class MetadataNodeManager implements AutoCloseable {
+    private static final Logger log = LoggerFactory.getLogger(MetadataNodeManager.class);
+
+    public static class Data {
+        private final DirectoryNode root = new DirectoryNode();
+        private String workingDirectory = "/";
+
+        public DirectoryNode root() {
+            return root;
+        }
+
+        public String workingDirectory() {
+            return workingDirectory;
+        }
+
+        public void setWorkingDirectory(String workingDirectory) {
+            this.workingDirectory = workingDirectory;
+        }
+    }
+
+    class LogListener implements MetaLogListener, RaftClient.Listener<ApiMessageAndVersion> {
+        @Override
+        public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
+            try {
+                // TODO: handle lastOffset
+                while (reader.hasNext()) {
+                    BatchReader.Batch<ApiMessageAndVersion> batch = reader.next();
+                    for (ApiMessageAndVersion messageAndVersion : batch.records()) {
+                        handleMessage(messageAndVersion.message());
+                    }
+                }
+            } finally {
+                reader.close();
+            }
+        }
+
+        @Override
+        public void handleCommits(long lastOffset, List<ApiMessage> messages) {
+            appendEvent("handleCommits", () -> {
+                log.error("handleCommits " + messages + " at offset " + lastOffset);
+                DirectoryNode dir = data.root.mkdirs("metadataQuorum");
+                dir.create("offset").setContents(String.valueOf(lastOffset));
+                for (ApiMessage message : messages) {
+                    handleMessage(message);
+                }
+            }, null);
+        }
+
+        @Override
+        public void handleNewLeader(MetaLogLeader leader) {
+            appendEvent("handleNewLeader", () -> {
+                log.error("handleNewLeader " + leader);
+                DirectoryNode dir = data.root.mkdirs("metadataQuorum");
+                dir.create("leader").setContents(leader.toString());
+            }, null);
+        }
+
+        @Override
+        public void handleClaim(int epoch) {
+            // This shouldn't happen because we should never be the leader.
+            log.debug("RaftClient.Listener sent handleClaim(epoch=" + epoch + ")");
+        }
+
+        @Override
+        public void handleRenounce(long epoch) {
+            // This shouldn't happen because we should never be the leader.
+            log.debug("MetaLogListener sent handleRenounce(epoch=" + epoch + ")");
+        }
+
+        @Override
+        public void handleResign() {
+            // This shouldn't happen because we should never be the leader.
+            log.debug("MetaLogListener sent handleResign()");
+        }
+
+        @Override
+        public void beginShutdown() {
+            log.debug("MetaLogListener sent beginShutdown");
+        }
+    }
+
+    private final Data data = new Data();
+    private final LogListener logListener = new LogListener();
+    private final ObjectMapper objectMapper;
+    private final KafkaEventQueue queue;
+
+    public MetadataNodeManager() {
+        this.objectMapper = new ObjectMapper();
+        this.objectMapper.registerModule(new Jdk8Module());

Review comment:
       Out of curiosity what is this module used for?

##########
File path: shell/src/main/java/org/apache/kafka/shell/SnapshotReader.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+
+/**
+ * The Kafka metadata tool.

Review comment:
       Is this javadoc accurate?

##########
File path: build.gradle
##########
@@ -1351,6 +1351,50 @@ project(':tools') {
   }
 }
 
+project(':shell') {

Review comment:
       Is "shell" the right name for this? I think "metadata shell" or "metashell" (like the branch name) is more descriptive of what it does. A Kafka "shell" makes me think of a tool that can do any of the normal command line things to a Kafka cluster (create topics, modify configs, etc). 
   
   Do we ever intend to make this utility more than a read-only metadata explorer?
   
   Maybe we can call it something like "kafka-metadata.sh" or "kafka-metadata-shell.sh"? I'm curious what others think here as well. 

##########
File path: metadata/src/main/resources/common/metadata/PartitionRecord.json
##########
@@ -34,6 +34,8 @@
     { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
       "about": "The lead replica, or -1 if there is no leader." },
     { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
-      "about": "An epoch that gets incremented each time we change the ISR." }
+      "about": "An epoch that gets incremented each time we change the partition leader." },

Review comment:
       Should this record change be part of this PR?

##########
File path: shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.apache.kafka.shell.MetadataNode.FileNode;
+import org.jline.reader.Candidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Implements the cat command.
+ */
+public final class CatCommandHandler implements Commands.Handler {
+    private static final Logger log = LoggerFactory.getLogger(CatCommandHandler.class);
+
+    public final static Commands.Type TYPE = new CatCommandType();
+
+    public static class CatCommandType implements Commands.Type {
+        private CatCommandType() {
+        }
+
+        @Override
+        public String name() {
+            return "cat";
+        }
+
+        @Override
+        public String description() {
+            return "Show the contents of metadata nodes.";
+        }
+
+        @Override
+        public boolean shellOnly() {
+            return false;
+        }
+
+        @Override
+        public void addArguments(ArgumentParser parser) {
+            parser.addArgument("targets").
+                nargs("+").
+                help("The metadata nodes to display.");
+        }
+
+        @Override
+        public Commands.Handler createHandler(Namespace namespace) {
+            return new CatCommandHandler(namespace.getList("targets"));
+        }
+
+        @Override
+        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
+                                 List<Candidate> candidates) throws Exception {
+            CommandUtils.completePath(nodeManager, nextWords.get(nextWords.size() - 1),
+                candidates);
+        }
+    }
+
+    private final List<String> targets;
+
+    public CatCommandHandler(List<String> targets) {
+        this.targets = targets;

Review comment:
       nit: should copy into the private list rather than assign

##########
File path: metadata/src/main/resources/common/metadata/IsrChangeRecord.json
##########
@@ -28,6 +28,8 @@
     { "name": "Leader", "type": "int32", "versions": "0+", "default": "-1",
       "about": "The lead replica, or -1 if there is no leader." },
     { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1",
-      "about": "An epoch that gets incremented each time we change the ISR." }
+      "about": "An epoch that gets incremented each time we change the partition leader." },
+    { "name": "PartitionEpoch", "type": "int32", "versions": "0+", "default": "-1",

Review comment:
       Should this record change be part of this PR?

##########
File path: core/src/main/scala/kafka/server/Server.scala
##########
@@ -29,6 +29,7 @@ trait Server {
 }
 
 object Server {
+  val metadataTopicName = "@metadata"

Review comment:
       Is this used anywhere?

##########
File path: build.gradle
##########
@@ -1351,6 +1351,50 @@ project(':tools') {
   }
 }
 
+project(':shell') {
+  archivesBaseName = "kafka-shell"
+
+  dependencies {
+    compile libs.argparse4j
+    compile libs.jacksonDatabind
+    compile libs.jacksonJDK8Datatypes
+    compile libs.jline
+    compile libs.slf4jApi
+    compile project(':clients')
+    compile project(':core')
+    compile project(':log4j-appender')
+    compile project(':metadata')
+    compile project(':raft')
+
+    compile libs.jacksonJaxrsJsonProvider
+
+    testCompile project(':clients')
+    testCompile libs.junitJupiter
+    testCompile project(':clients').sourceSets.test.output

Review comment:
       Do we actually need this here? This would only be needed if there was a class in clients/src/test that we depend on. Doing this "output" kind of dependency in gradle can slow down the build, so we should avoid if possible




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10094: MINOR: Add the KIP-500 metadata shell

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10094:
URL: https://github.com/apache/kafka/pull/10094#discussion_r574081130



##########
File path: build.gradle
##########
@@ -1351,6 +1351,50 @@ project(':tools') {
   }
 }
 
+project(':shell') {
+  archivesBaseName = "kafka-shell"
+
+  dependencies {
+    compile libs.argparse4j
+    compile libs.jacksonDatabind
+    compile libs.jacksonJDK8Datatypes
+    compile libs.jline
+    compile libs.slf4jApi
+    compile project(':clients')
+    compile project(':core')
+    compile project(':log4j-appender')
+    compile project(':metadata')
+    compile project(':raft')
+
+    compile libs.jacksonJaxrsJsonProvider
+
+    testCompile project(':clients')
+    testCompile libs.junitJupiter
+    testCompile project(':clients').sourceSets.test.output

Review comment:
       hmm, seems it's not needed.  I forget why I thought I needed it.  I'll remove it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org