You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/27 01:16:01 UTC

[kafka] branch 2.8 updated: MINOR: fix kafka-metadata-shell.sh (#10226)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 3f76293  MINOR: fix kafka-metadata-shell.sh (#10226)
3f76293 is described below

commit 3f762931b2edeb03132adda5e679035b33d5fa35
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Fri Feb 26 17:13:20 2021 -0800

    MINOR: fix kafka-metadata-shell.sh (#10226)
    
    * Fix CLASSPATH issues in the startup script
    
    * Fix overly verbose log messages during loading
    
    * Update to use the new MetadataRecordSerde (this is needed now that we
      have a frame version)
    
    * Fix initialization
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 bin/kafka-run-class.sh                                  | 12 ++++++++++++
 .../org/apache/kafka/shell/MetadataNodeManager.java     |  4 ++--
 .../main/java/org/apache/kafka/shell/MetadataShell.java |  8 +++++++-
 .../java/org/apache/kafka/shell/SnapshotFileReader.java | 17 +++++------------
 4 files changed, 26 insertions(+), 15 deletions(-)

diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 0d32285..3889be7 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -135,6 +135,18 @@ do
   CLASSPATH="$CLASSPATH":"$file"
 done
 
+for file in "$base_dir"/shell/build/libs/kafka-shell*.jar;
+do
+  if should_include_file "$file"; then
+    CLASSPATH="$CLASSPATH":"$file"
+  fi
+done
+
+for dir in "$base_dir"/shell/build/dependant-libs-${SCALA_VERSION}*;
+do
+  CLASSPATH="$CLASSPATH:$dir/*"
+done
+
 for file in "$base_dir"/tools/build/libs/kafka-tools*.jar;
 do
   if should_include_file "$file"; then
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
index fafccfa..739e027 100644
--- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
@@ -96,7 +96,7 @@ public final class MetadataNodeManager implements AutoCloseable {
         @Override
         public void handleCommits(long lastOffset, List<ApiMessage> messages) {
             appendEvent("handleCommits", () -> {
-                log.error("handleCommits " + messages + " at offset " + lastOffset);
+                log.debug("handleCommits " + messages + " at offset " + lastOffset);
                 DirectoryNode dir = data.root.mkdirs("metadataQuorum");
                 dir.create("offset").setContents(String.valueOf(lastOffset));
                 for (ApiMessage message : messages) {
@@ -108,7 +108,7 @@ public final class MetadataNodeManager implements AutoCloseable {
         @Override
         public void handleNewLeader(MetaLogLeader leader) {
             appendEvent("handleNewLeader", () -> {
-                log.error("handleNewLeader " + leader);
+                log.debug("handleNewLeader " + leader);
                 DirectoryNode dir = data.root.mkdirs("metadataQuorum");
                 dir.create("leader").setContents(leader.toString());
             }, null);
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
index b701310..9ba70d8 100644
--- a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
@@ -54,6 +54,9 @@ public final class MetadataShell {
         }
 
         public MetadataShell build() throws Exception {
+            if (snapshotPath == null) {
+                throw new RuntimeException("You must supply the log path via --snapshot");
+            }
             MetadataNodeManager nodeManager = null;
             SnapshotFileReader reader = null;
             try {
@@ -99,11 +102,15 @@ public final class MetadataShell {
         }
         if (args == null || args.isEmpty()) {
             // Interactive mode.
+            System.out.println("Loading...");
+            waitUntilCaughtUp();
+            System.out.println("Starting...");
             try (InteractiveShell shell = new InteractiveShell(nodeManager)) {
                 shell.runMainLoop();
             }
         } else {
             // Non-interactive mode.
+            waitUntilCaughtUp();
             Commands commands = new Commands(false);
             try (PrintWriter writer = new PrintWriter(new BufferedWriter(
                     new OutputStreamWriter(System.out, StandardCharsets.UTF_8)))) {
@@ -150,7 +157,6 @@ public final class MetadataShell {
                 }
             });
             MetadataShell shell = builder.build();
-            shell.waitUntilCaughtUp();
             try {
                 shell.run(res.getList("command"));
             } finally {
diff --git a/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java b/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java
index e566be6..907b4db 100644
--- a/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java
+++ b/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.shell;
 
 import org.apache.kafka.common.message.LeaderChangeMessage;
-import org.apache.kafka.common.metadata.MetadataRecordType;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.record.ControlRecordType;
@@ -27,8 +26,10 @@ import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
 import org.apache.kafka.metalog.MetaLogLeader;
 import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
 import org.apache.kafka.queue.EventQueue;
 import org.apache.kafka.queue.KafkaEventQueue;
 import org.slf4j.Logger;
@@ -53,6 +54,7 @@ public final class SnapshotFileReader implements AutoCloseable {
     private final CompletableFuture<Void> caughtUpFuture;
     private FileRecords fileRecords;
     private Iterator<FileChannelRecordBatch> batchIterator;
+    private final MetadataRecordSerde serde = new MetadataRecordSerde();
 
     public SnapshotFileReader(String snapshotPath, MetaLogListener listener) {
         this.snapshotPath = snapshotPath;
@@ -140,17 +142,8 @@ public final class SnapshotFileReader implements AutoCloseable {
             Record record = iter.next();
             ByteBufferAccessor accessor = new ByteBufferAccessor(record.value());
             try {
-                int apiKey = accessor.readUnsignedVarint();
-                if (apiKey > Short.MAX_VALUE || apiKey < 0) {
-                    throw new RuntimeException("Invalid apiKey value " + apiKey);
-                }
-                int apiVersion = accessor.readUnsignedVarint();
-                if (apiVersion > Short.MAX_VALUE || apiVersion < 0) {
-                    throw new RuntimeException("Invalid apiVersion value " + apiVersion);
-                }
-                ApiMessage message = MetadataRecordType.fromId((short) apiKey).newMetadataRecord();
-                message.read(accessor, (short) apiVersion);
-                messages.add(message);
+                ApiMessageAndVersion messageAndVersion = serde.read(accessor, record.valueSize());
+                messages.add(messageAndVersion.message());
             } catch (Throwable e) {
                 log.error("unable to read metadata record at offset {}", record.offset(), e);
             }