You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/27 01:14:27 UTC
[kafka] branch trunk updated: MINOR: fix kafka-metadata-shell.sh
(#10226)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5813446 MINOR: fix kafka-metadata-shell.sh (#10226)
5813446 is described below
commit 581344673047c126f2e5f2c00c00221d12a9b7d5
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Fri Feb 26 17:13:20 2021 -0800
MINOR: fix kafka-metadata-shell.sh (#10226)
* Fix CLASSPATH issues in the startup script
* Fix overly verbose log messages during loading
* Update to use the new MetadataRecordSerde (this is needed now that we
have a frame version)
* Fix initialization
Reviewers: Jason Gustafson <ja...@confluent.io>
---
bin/kafka-run-class.sh | 12 ++++++++++++
.../org/apache/kafka/shell/MetadataNodeManager.java | 4 ++--
.../main/java/org/apache/kafka/shell/MetadataShell.java | 8 +++++++-
.../java/org/apache/kafka/shell/SnapshotFileReader.java | 17 +++++------------
4 files changed, 26 insertions(+), 15 deletions(-)
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 0d32285..3889be7 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -135,6 +135,18 @@ do
CLASSPATH="$CLASSPATH":"$file"
done
+for file in "$base_dir"/shell/build/libs/kafka-shell*.jar;
+do
+ if should_include_file "$file"; then
+ CLASSPATH="$CLASSPATH":"$file"
+ fi
+done
+
+for dir in "$base_dir"/shell/build/dependant-libs-${SCALA_VERSION}*;
+do
+ CLASSPATH="$CLASSPATH:$dir/*"
+done
+
for file in "$base_dir"/tools/build/libs/kafka-tools*.jar;
do
if should_include_file "$file"; then
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
index fafccfa..739e027 100644
--- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
@@ -96,7 +96,7 @@ public final class MetadataNodeManager implements AutoCloseable {
@Override
public void handleCommits(long lastOffset, List<ApiMessage> messages) {
appendEvent("handleCommits", () -> {
- log.error("handleCommits " + messages + " at offset " + lastOffset);
+ log.debug("handleCommits " + messages + " at offset " + lastOffset);
DirectoryNode dir = data.root.mkdirs("metadataQuorum");
dir.create("offset").setContents(String.valueOf(lastOffset));
for (ApiMessage message : messages) {
@@ -108,7 +108,7 @@ public final class MetadataNodeManager implements AutoCloseable {
@Override
public void handleNewLeader(MetaLogLeader leader) {
appendEvent("handleNewLeader", () -> {
- log.error("handleNewLeader " + leader);
+ log.debug("handleNewLeader " + leader);
DirectoryNode dir = data.root.mkdirs("metadataQuorum");
dir.create("leader").setContents(leader.toString());
}, null);
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
index b701310..9ba70d8 100644
--- a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
@@ -54,6 +54,9 @@ public final class MetadataShell {
}
public MetadataShell build() throws Exception {
+ if (snapshotPath == null) {
+ throw new RuntimeException("You must supply the log path via --snapshot");
+ }
MetadataNodeManager nodeManager = null;
SnapshotFileReader reader = null;
try {
@@ -99,11 +102,15 @@ public final class MetadataShell {
}
if (args == null || args.isEmpty()) {
// Interactive mode.
+ System.out.println("Loading...");
+ waitUntilCaughtUp();
+ System.out.println("Starting...");
try (InteractiveShell shell = new InteractiveShell(nodeManager)) {
shell.runMainLoop();
}
} else {
// Non-interactive mode.
+ waitUntilCaughtUp();
Commands commands = new Commands(false);
try (PrintWriter writer = new PrintWriter(new BufferedWriter(
new OutputStreamWriter(System.out, StandardCharsets.UTF_8)))) {
@@ -150,7 +157,6 @@ public final class MetadataShell {
}
});
MetadataShell shell = builder.build();
- shell.waitUntilCaughtUp();
try {
shell.run(res.getList("command"));
} finally {
diff --git a/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java b/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java
index e566be6..907b4db 100644
--- a/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java
+++ b/shell/src/main/java/org/apache/kafka/shell/SnapshotFileReader.java
@@ -18,7 +18,6 @@
package org.apache.kafka.shell;
import org.apache.kafka.common.message.LeaderChangeMessage;
-import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.record.ControlRecordType;
@@ -27,8 +26,10 @@ import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.metalog.MetaLogLeader;
import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.slf4j.Logger;
@@ -53,6 +54,7 @@ public final class SnapshotFileReader implements AutoCloseable {
private final CompletableFuture<Void> caughtUpFuture;
private FileRecords fileRecords;
private Iterator<FileChannelRecordBatch> batchIterator;
+ private final MetadataRecordSerde serde = new MetadataRecordSerde();
public SnapshotFileReader(String snapshotPath, MetaLogListener listener) {
this.snapshotPath = snapshotPath;
@@ -140,17 +142,8 @@ public final class SnapshotFileReader implements AutoCloseable {
Record record = iter.next();
ByteBufferAccessor accessor = new ByteBufferAccessor(record.value());
try {
- int apiKey = accessor.readUnsignedVarint();
- if (apiKey > Short.MAX_VALUE || apiKey < 0) {
- throw new RuntimeException("Invalid apiKey value " + apiKey);
- }
- int apiVersion = accessor.readUnsignedVarint();
- if (apiVersion > Short.MAX_VALUE || apiVersion < 0) {
- throw new RuntimeException("Invalid apiVersion value " + apiVersion);
- }
- ApiMessage message = MetadataRecordType.fromId((short) apiKey).newMetadataRecord();
- message.read(accessor, (short) apiVersion);
- messages.add(message);
+ ApiMessageAndVersion messageAndVersion = serde.read(accessor, record.valueSize());
+ messages.add(messageAndVersion.message());
} catch (Throwable e) {
log.error("unable to read metadata record at offset {}", record.offset(), e);
}