You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2023/05/19 17:24:49 UTC

[kafka] 01/01: MINOR: Create the MetadataNode classes to introspect MetadataImage

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch printer
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 86b01c7dcf10926fdc56db2f926605092e053266
Author: Colin P. McCabe <cm...@apache.org>
AuthorDate: Thu May 4 23:57:10 2023 -0700

    MINOR: Create the MetadataNode classes to introspect MetadataImage
    
    Metadata image classes such as MetadataImage, ClusterImage, FeaturesImage, and so forth contain
    numerous sub-images. This PR adds a structured way of traversing those sub-images. This is useful
    for the metadata shell, and also for implementing toString functions.
    
    In both cases, the previous solution was suboptimal. The metadata shell was previously implemented
    in an ad-hoc way by mutating text-based tree nodes when records were replayed. This was difficult
    to keep in sync with changes to the record types (for example, we forgot to do this for SCRAM). It
    was also pretty low-level, being done at a level below that of the image classes. For toString, it
    was difficult to keep the implementations consistent previously, and also support both redacted and
    non-redacted output.
    
    The metadata shell directory was getting crowded since we never had submodules for it. This PR
    creates glob/, command/, node/, and state/ directories to keep things better organized.
---
 checkstyle/import-control.xml                      |   3 +
 .../java/org/apache/kafka/image/AclsImage.java     |   6 +-
 .../org/apache/kafka/image/ClientQuotaImage.java   |   6 +-
 .../org/apache/kafka/image/ClientQuotasImage.java  |   6 +-
 .../java/org/apache/kafka/image/ClusterImage.java  |   6 +-
 .../org/apache/kafka/image/ConfigurationDelta.java |   2 +-
 .../org/apache/kafka/image/ConfigurationImage.java |  21 +-
 .../apache/kafka/image/ConfigurationsDelta.java    |   9 +-
 .../apache/kafka/image/ConfigurationsImage.java    |   6 +-
 .../java/org/apache/kafka/image/FeaturesImage.java |   8 +-
 .../java/org/apache/kafka/image/MetadataImage.java |  13 +-
 .../org/apache/kafka/image/ProducerIdsDelta.java   |   2 +-
 .../org/apache/kafka/image/ProducerIdsImage.java   |  16 +-
 .../java/org/apache/kafka/image/ScramImage.java    |  22 +-
 .../java/org/apache/kafka/image/TopicImage.java    |   7 +-
 .../java/org/apache/kafka/image/TopicsImage.java   |  14 +-
 .../apache/kafka/image/node/AclsImageByIdNode.java |  64 +++
 .../org/apache/kafka/image/node/AclsImageNode.java |  43 +-
 .../kafka/image/node/ClientQuotaImageNode.java     |  35 +-
 .../kafka/image/node/ClientQuotasImageNode.java    | 157 +++++++
 .../apache/kafka/image/node/ClusterImageNode.java  |  62 +++
 .../kafka/image/node/ConfigurationImageNode.java   |  62 +++
 .../kafka/image/node/ConfigurationsImageNode.java  |  80 ++++
 .../apache/kafka/image/node/FeaturesImageNode.java |  81 ++++
 .../apache/kafka/image/node/MetadataImageNode.java |  73 ++++
 .../apache/kafka/image/node/MetadataLeafNode.java  |  27 +-
 .../org/apache/kafka/image/node/MetadataNode.java  |  69 ++++
 .../kafka/image/node/ProducerIdsImageNode.java     |  43 +-
 .../apache/kafka/image/node/ProvenanceNode.java    |  57 +++
 .../kafka/image/node/ScramCredentialDataNode.java  |  73 ++++
 .../apache/kafka/image/node/ScramImageNode.java    |  63 +++
 .../kafka/image/node/ScramMechanismNode.java       |  36 +-
 .../apache/kafka/image/node/TopicImageNode.java    |  66 +++
 .../kafka/image/node/TopicsImageByIdNode.java      |  64 +++
 .../kafka/image/node/TopicsImageByNameNode.java    |  41 +-
 .../apache/kafka/image/node/TopicsImageNode.java   |  56 +++
 .../image/node/printer/MetadataNodePrinter.java    |  39 +-
 .../printer/MetadataNodeRedactionCriteria.java     |  85 ++++
 .../kafka/image/node/printer/NodeStringifier.java  |  67 +++
 .../metadata/migration/KRaftMigrationZkWriter.java |   2 +-
 .../kafka/metadata/util/SnapshotFileReader.java    |  12 +-
 .../kafka/image/ConfigurationsImageTest.java       |   8 +-
 .../image/node/ClientQuotasImageNodeTest.java      | 134 ++++++
 .../image/node/ConfigurationImageNodeTest.java     |  77 ++++
 .../image/node/ConfigurationsImageNodeTest.java    |  86 ++++
 .../image/node/ScramCredentialDataNodeTest.java    |  64 +++
 .../printer/MetadataNodeRedactionCriteriaTest.java | 114 ++++++
 .../image/node/printer/NodeStringifierTest.java    |  55 +++
 .../org/apache/kafka/shell/InteractiveShell.java   |  23 +-
 .../java/org/apache/kafka/shell/MetadataNode.java  | 140 -------
 .../apache/kafka/shell/MetadataNodeManager.java    | 406 -------------------
 .../java/org/apache/kafka/shell/MetadataShell.java | 135 ++++--
 .../shell/{ => command}/CatCommandHandler.java     |  41 +-
 .../shell/{ => command}/CdCommandHandler.java      |  49 +--
 .../kafka/shell/{ => command}/CommandUtils.java    |  38 +-
 .../apache/kafka/shell/{ => command}/Commands.java |  42 +-
 .../{ => command}/ErroneousCommandHandler.java     |  13 +-
 .../shell/{ => command}/ExitCommandHandler.java    |  19 +-
 .../shell/{ => command}/FindCommandHandler.java    |  45 +-
 .../shell/{ => command}/HelpCommandHandler.java    |  19 +-
 .../shell/{ => command}/HistoryCommandHandler.java |  19 +-
 .../shell/{ => command}/LsCommandHandler.java      |  37 +-
 .../shell/{ => command}/ManCommandHandler.java     |  19 +-
 .../shell/{ => command}/NoOpCommandHandler.java    |  13 +-
 .../shell/{ => command}/PwdCommandHandler.java     |  23 +-
 .../TreeCommandHandler.java}                       |  59 +--
 .../kafka/shell/{ => glob}/GlobComponent.java      |   2 +-
 .../apache/kafka/shell/{ => glob}/GlobVisitor.java |  50 ++-
 .../apache/kafka/shell/node/LocalShellNode.java    |  62 +++
 .../org/apache/kafka/shell/node/RootShellNode.java |  52 +++
 .../kafka/shell/node/printer/ShellNodePrinter.java |  67 +++
 .../kafka/shell/state/MetadataShellPublisher.java  |  54 +++
 .../kafka/shell/state/MetadataShellState.java      |  57 +++
 .../org/apache/kafka/shell/GlobComponentTest.java  |  76 ----
 .../kafka/shell/MetadataNodeManagerTest.java       | 451 ---------------------
 .../org/apache/kafka/shell/MetadataNodeTest.java   |  73 ----
 .../kafka/shell/{ => command}/CommandTest.java     |   4 +-
 .../shell/{ => command}/CommandUtilsTest.java      |   6 +-
 .../shell/{ => command}/LsCommandHandlerTest.java  |   6 +-
 .../kafka/shell/{ => glob}/GlobVisitorTest.java    |  90 +++-
 80 files changed, 2717 insertions(+), 1615 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 4fc539d6067..3096ba80d1e 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -275,7 +275,10 @@
     <allow pkg="org.apache.kafka.queue"/>
     <allow pkg="org.apache.kafka.raft"/>
     <allow pkg="org.apache.kafka.server.common" />
+    <allow pkg="org.apache.kafka.server.fault" />
     <allow pkg="org.apache.kafka.shell"/>
+    <allow pkg="org.apache.kafka.image"/>
+    <allow pkg="org.apache.kafka.image.loader"/>
     <allow pkg="org.apache.kafka.snapshot"/>
     <allow pkg="org.jline"/>
     <allow pkg="scala.compat"/>
diff --git a/metadata/src/main/java/org/apache/kafka/image/AclsImage.java b/metadata/src/main/java/org/apache/kafka/image/AclsImage.java
index e40ba9e8aa5..371abbbb35b 100644
--- a/metadata/src/main/java/org/apache/kafka/image/AclsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/AclsImage.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.image;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.image.node.AclsImageByIdNode;
 import org.apache.kafka.image.writer.ImageWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
 import org.apache.kafka.metadata.authorizer.StandardAcl;
@@ -26,7 +27,6 @@ import org.apache.kafka.metadata.authorizer.StandardAclWithId;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.stream.Collectors;
 
 
 /**
@@ -76,8 +76,6 @@ public final class AclsImage {
 
     @Override
     public String toString() {
-        return "AclsImage(" + acls.values().stream().
-            map(a -> a.toString()).
-            collect(Collectors.joining(", ")) + ")";
+        return new AclsImageByIdNode(this).stringify();
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java b/metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
index 6e0b84bbfbb..6c7bae44f33 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.message.DescribeClientQuotasResponseData.ValueDat
 import org.apache.kafka.common.metadata.ClientQuotaRecord;
 import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
 import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.image.node.ClientQuotaImageNode;
 import org.apache.kafka.image.writer.ImageWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
 
@@ -31,7 +32,6 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.Map;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 
 /**
@@ -114,8 +114,6 @@ public final class ClientQuotaImage {
 
     @Override
     public String toString() {
-        return "ClientQuotaImage(quotas=" + quotas.entrySet().stream().
-            map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) +
-            ")";
+        return new ClientQuotaImageNode(this).stringify();
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java b/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
index 3eb3e3c4e95..4e539f8a488 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
 import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntityData;
 import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntryData;
 import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.image.node.ClientQuotasImageNode;
 import org.apache.kafka.image.writer.ImageWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
 
@@ -34,7 +35,6 @@ import java.util.Map.Entry;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.quota.ClientQuotaEntity.CLIENT_ID;
 import static org.apache.kafka.common.quota.ClientQuotaEntity.IP;
@@ -188,8 +188,6 @@ public final class ClientQuotasImage {
 
     @Override
     public String toString() {
-        return "ClientQuotasImage(entities=" + entities.entrySet().stream().
-            map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) +
-            ")";
+        return new ClientQuotasImageNode(this).stringify();
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java b/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
index 7563657e9a4..253bd193ffe 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClusterImage.java
@@ -17,13 +17,14 @@
 
 package org.apache.kafka.image;
 
+import org.apache.kafka.image.node.ClusterImageNode;
 import org.apache.kafka.image.writer.ImageWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
 import org.apache.kafka.metadata.BrokerRegistration;
 
 import java.util.Collections;
 import java.util.Map;
-import java.util.stream.Collectors;
+
 
 /**
  * Represents the cluster in the metadata image.
@@ -75,7 +76,6 @@ public final class ClusterImage {
 
     @Override
     public String toString() {
-        return brokers.entrySet().stream().
-            map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", "));
+        return new ClusterImageNode(this).stringify();
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java
index 677f764b831..dc550d8c72a 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java
@@ -72,7 +72,7 @@ public final class ConfigurationDelta {
                 }
             }
         }
-        return new ConfigurationImage(newData);
+        return new ConfigurationImage(image.resource(), newData);
     }
 
     @Override
diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java
index bf74bb1aeb7..0e10579f0fb 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationImage.java
@@ -19,6 +19,7 @@ package org.apache.kafka.image;
 
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.image.node.ConfigurationImageNode;
 import org.apache.kafka.image.writer.ImageWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
 
@@ -26,8 +27,6 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
-import java.util.stream.Collectors;
-
 
 
 /**
@@ -36,15 +35,23 @@ import java.util.stream.Collectors;
  * This class is thread-safe.
  */
 public final class ConfigurationImage {
-    public static final ConfigurationImage EMPTY = new ConfigurationImage(Collections.emptyMap());
+    private final ConfigResource resource;
 
     private final Map<String, String> data;
 
-    public ConfigurationImage(Map<String, String> data) {
+    public ConfigurationImage(
+        ConfigResource resource,
+        Map<String, String> data
+    ) {
+        this.resource = resource;
         this.data = data;
     }
 
-    Map<String, String> data() {
+    public ConfigResource resource() {
+        return resource;
+    }
+
+    public Map<String, String> data() {
         return data;
     }
 
@@ -90,8 +97,6 @@ public final class ConfigurationImage {
 
     @Override
     public String toString() {
-        return "ConfigurationImage(data=" + data.entrySet().stream().
-            map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) +
-            ")";
+        return new ConfigurationImageNode(this).stringify();
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
index 2a4bf1a1ca2..eab0505e332 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.RemoveTopicRecord;
 import org.apache.kafka.server.common.MetadataVersion;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -60,8 +61,8 @@ public final class ConfigurationsDelta {
     public void replay(ConfigRecord record) {
         ConfigResource resource =
             new ConfigResource(Type.forId(record.resourceType()), record.resourceName());
-        ConfigurationImage configImage =
-            image.resourceData().getOrDefault(resource, ConfigurationImage.EMPTY);
+        ConfigurationImage configImage = image.resourceData().getOrDefault(resource,
+                new ConfigurationImage(resource, Collections.emptyMap()));
         ConfigurationDelta delta = changes.computeIfAbsent(resource,
             __ -> new ConfigurationDelta(configImage));
         delta.replay(record);
@@ -70,8 +71,8 @@ public final class ConfigurationsDelta {
     public void replay(RemoveTopicRecord record, String topicName) {
         ConfigResource resource =
             new ConfigResource(Type.TOPIC, topicName);
-        ConfigurationImage configImage =
-            image.resourceData().getOrDefault(resource, ConfigurationImage.EMPTY);
+        ConfigurationImage configImage = image.resourceData().getOrDefault(resource,
+                new ConfigurationImage(resource, Collections.emptyMap()));
         ConfigurationDelta delta = changes.computeIfAbsent(resource,
             __ -> new ConfigurationDelta(configImage));
         delta.deleteAll();
diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
index 6762391619b..0f0aef4d0a0 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.image;
 
 import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.image.node.ConfigurationsImageNode;
 import org.apache.kafka.image.writer.ImageWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
 
@@ -26,7 +27,6 @@ import java.util.Map.Entry;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
-import java.util.stream.Collectors;
 
 
 /**
@@ -96,8 +96,6 @@ public final class ConfigurationsImage {
 
     @Override
     public String toString() {
-        return "ConfigurationsImage(data=" + data.entrySet().stream().
-            map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) +
-            ")";
+        return new ConfigurationsImageNode(this).stringify();
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
index 623b45a8dba..4f22588ed02 100644
--- a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.image;
 
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.image.node.FeaturesImageNode;
 import org.apache.kafka.image.writer.ImageWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
 import org.apache.kafka.metadata.migration.ZkMigrationState;
@@ -144,13 +145,8 @@ public final class FeaturesImage {
             zkMigrationState.equals(other.zkMigrationState);
     }
 
-
     @Override
     public String toString() {
-        return "FeaturesImage{" +
-                "finalizedVersions=" + finalizedVersions +
-                ", metadataVersion=" + metadataVersion +
-                ", zkMigrationState=" + zkMigrationState +
-                '}';
+        return new FeaturesImageNode(this).stringify();
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
index 8643a23e03c..12e41d690df 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.image;
 
+import org.apache.kafka.image.node.MetadataImageNode;
 import org.apache.kafka.image.writer.ImageWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
 import org.apache.kafka.raft.OffsetAndEpoch;
@@ -181,16 +182,6 @@ public final class MetadataImage {
 
     @Override
     public String toString() {
-        return "MetadataImage(" +
-            "provenance=" + provenance +
-            ", features=" + features +
-            ", cluster=" + cluster +
-            ", topics=" + topics +
-            ", configs=" + configs +
-            ", clientQuotas=" + clientQuotas +
-            ", producerIdsImage=" + producerIds +
-            ", acls=" + acls +
-            ", scram=" + scram +
-            ")";
+        return new MetadataImageNode(this).stringify();
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsDelta.java b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsDelta.java
index 4c1dea17ebe..5155bb4bda3 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsDelta.java
@@ -25,7 +25,7 @@ public final class ProducerIdsDelta {
     private long nextProducerId;
 
     public ProducerIdsDelta(ProducerIdsImage image) {
-        this.nextProducerId = image.highestSeenProducerId();
+        this.nextProducerId = image.nextProducerId();
     }
 
     public void setNextProducerId(long highestSeenProducerId) {
diff --git a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java
index 8f958d0f93a..e1ed7baccb9 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ProducerIdsImage.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.image;
 
 import org.apache.kafka.common.metadata.ProducerIdsRecord;
+import org.apache.kafka.image.node.ProducerIdsImageNode;
 import org.apache.kafka.image.writer.ImageWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
 
@@ -32,13 +33,16 @@ import java.util.Objects;
 public final class ProducerIdsImage {
     public final static ProducerIdsImage EMPTY = new ProducerIdsImage(-1L);
 
+    /**
+     * The next producer ID, or -1 in the special case where no producer IDs have been issued.
+     */
     private final long nextProducerId;
 
     public ProducerIdsImage(long nextProducerId) {
         this.nextProducerId = nextProducerId;
     }
 
-    public long highestSeenProducerId() {
+    public long nextProducerId() {
         return nextProducerId;
     }
 
@@ -51,6 +55,10 @@ public final class ProducerIdsImage {
         }
     }
 
+    public boolean isEmpty() {
+        return nextProducerId == EMPTY.nextProducerId;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (!(o instanceof ProducerIdsImage)) return false;
@@ -65,10 +73,6 @@ public final class ProducerIdsImage {
 
     @Override
     public String toString() {
-        return "ProducerIdsImage(highestSeenProducerId=" + nextProducerId + ")";
-    }
-
-    public boolean isEmpty() {
-        return nextProducerId < 0;
+        return new ProducerIdsImageNode(this).stringify();
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/ScramImage.java b/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
index a48d6f66652..19807a63f9e 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ScramImage.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.image;
 
+import org.apache.kafka.image.node.ScramImageNode;
 import org.apache.kafka.image.writer.ImageWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
 import org.apache.kafka.clients.admin.ScramMechanism;
@@ -80,8 +81,8 @@ public final class ScramImage {
 
     private static final String DESCRIBE_DUPLICATE_USER = "Cannot describe SCRAM credentials for the same user twice in a single request: ";
     private static final String DESCRIBE_USER_THAT_DOES_NOT_EXIST = "Attempt to describe a user credential that does not exist: ";
-    public DescribeUserScramCredentialsResponseData describe(DescribeUserScramCredentialsRequestData request) {
 
+    public DescribeUserScramCredentialsResponseData describe(DescribeUserScramCredentialsRequestData request) {
         List<UserName> users = request.users();
         Map<String, Boolean> uniqueUsers = new HashMap<String, Boolean>();
 
@@ -157,23 +158,6 @@ public final class ScramImage {
 
     @Override
     public String toString() {
-        StringBuilder builder = new StringBuilder();
-        builder.append("ScramImage(");
-        List<ScramMechanism> sortedMechanisms = mechanisms.keySet().stream().sorted().collect(Collectors.toList());
-        String preMechanismComma = "";
-        for (ScramMechanism mechanism : sortedMechanisms) {
-            builder.append(preMechanismComma).append(mechanism).append(": {");
-            Map<String, ScramCredentialData> userMap = mechanisms.get(mechanism);
-            List<String> sortedUserNames = userMap.keySet().stream().sorted().collect(Collectors.toList());
-            String preUserNameComma = "";
-            for (String userName : sortedUserNames) {
-                builder.append(preUserNameComma).append(userName).append("=").append(userMap.get(userName));
-                preUserNameComma = ", ";
-            }
-            builder.append("}");
-            preMechanismComma = ", ";
-        }
-        builder.append(")");
-        return builder.toString();
+        return new ScramImageNode(this).stringify();
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicImage.java b/metadata/src/main/java/org/apache/kafka/image/TopicImage.java
index 1b7751811b2..743e37e8236 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicImage.java
@@ -19,6 +19,7 @@ package org.apache.kafka.image;
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.image.node.TopicImageNode;
 import org.apache.kafka.image.writer.ImageWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
 import org.apache.kafka.metadata.PartitionRegistration;
@@ -26,7 +27,6 @@ import org.apache.kafka.metadata.PartitionRegistration;
 import java.util.Map.Entry;
 import java.util.Map;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 
 /**
@@ -88,9 +88,6 @@ public final class TopicImage {
 
     @Override
     public String toString() {
-        return "TopicImage(name=" + name + ", id=" + id + ", partitions=" +
-            partitions.entrySet().stream().
-                map(e -> e.getKey() + ":" + e.getValue()).
-                collect(Collectors.joining(", ")) + ")";
+        return new TopicImageNode(this).stringify();
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
index 569264b1c4c..a3cd9bc8969 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.image;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.image.node.TopicsImageByNameNode;
 import org.apache.kafka.image.writer.ImageWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
 import org.apache.kafka.metadata.PartitionRegistration;
@@ -26,7 +27,6 @@ import org.apache.kafka.server.util.TranslatedValueMapView;
 
 import java.util.Map;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 /**
  * Represents the topics in the metadata image.
@@ -39,8 +39,10 @@ public final class TopicsImage {
     private final ImmutableMap<Uuid, TopicImage> topicsById;
     private final ImmutableMap<String, TopicImage> topicsByName;
 
-    public TopicsImage(ImmutableMap<Uuid, TopicImage> topicsById,
-                       ImmutableMap<String, TopicImage> topicsByName) {
+    public TopicsImage(
+        ImmutableMap<Uuid, TopicImage> topicsById,
+        ImmutableMap<String, TopicImage> topicsByName
+    ) {
         this.topicsById = topicsById;
         this.topicsByName = topicsByName;
     }
@@ -116,10 +118,6 @@ public final class TopicsImage {
 
     @Override
     public String toString() {
-        return "TopicsImage(topicsById=" + topicsById.entrySet().stream().
-            map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) +
-            ", topicsByName=" + topicsByName.entrySet().stream().
-            map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) +
-            ")";
+        return new TopicsImageByNameNode(this).stringify();
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/node/AclsImageByIdNode.java b/metadata/src/main/java/org/apache/kafka/image/node/AclsImageByIdNode.java
new file mode 100644
index 00000000000..33f4b3ec1a5
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/node/AclsImageByIdNode.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.image.AclsImage;
+import org.apache.kafka.metadata.authorizer.StandardAcl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+
+public class AclsImageByIdNode implements MetadataNode {
+    /**
+     * The name of this node.
+     */
+    public final static String NAME = "byId";
+
+    /**
+     * The ACLs image.
+     */
+    private final AclsImage image;
+
+    public AclsImageByIdNode(AclsImage image) {
+        this.image = image;
+    }
+
+    @Override
+    public Collection<String> childNames() {
+        ArrayList<String> childNames = new ArrayList<>();
+        for (Uuid uuid : image.acls().keySet()) {
+            childNames.add(uuid.toString());
+        }
+        return childNames;
+    }
+
+    @Override
+    public MetadataNode child(String name) {
+        Uuid uuid;
+        try {
+            uuid = Uuid.fromString(name);
+        } catch (Exception e) {
+            return null;
+        }
+        StandardAcl acl = image.acls().get(uuid);
+        if (acl == null) return null;
+        return new MetadataLeafNode(acl.toString());
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java b/metadata/src/main/java/org/apache/kafka/image/node/AclsImageNode.java
similarity index 53%
copy from shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java
copy to metadata/src/main/java/org/apache/kafka/image/node/AclsImageNode.java
index 1756ba76aa8..8152dbd8336 100644
--- a/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java
+++ b/metadata/src/main/java/org/apache/kafka/image/node/AclsImageNode.java
@@ -15,29 +15,40 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.image.node;
 
-import java.io.PrintWriter;
-import java.util.Optional;
+import org.apache.kafka.image.AclsImage;
 
-/**
- * Does nothing.
- */
-public final class NoOpCommandHandler implements Commands.Handler {
-    @Override
-    public void run(Optional<InteractiveShell> shell,
-                    PrintWriter writer,
-                    MetadataNodeManager manager) {
+import java.util.Collection;
+import java.util.Collections;
+
+
+public class AclsImageNode implements MetadataNode {
+    /**
+     * The name of this node.
+     */
+    public static final String NAME = "acls";
+
+    /**
+     * The ACLs image.
+     */
+    private final AclsImage image;
+
+    public AclsImageNode(AclsImage image) {
+        this.image = image;
     }
 
     @Override
-    public int hashCode() {
-        return 0;
+    public Collection<String> childNames() {
+        return Collections.singletonList(AclsImageByIdNode.NAME);
     }
 
     @Override
-    public boolean equals(Object other) {
-        if (!(other instanceof NoOpCommandHandler)) return false;
-        return true;
+    public MetadataNode child(String name) {
+        if (name.equals(AclsImageByIdNode.NAME)) {
+            return new AclsImageByIdNode(image);
+        } else {
+            return null;
+        }
     }
 }
diff --git a/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java b/metadata/src/main/java/org/apache/kafka/image/node/ClientQuotaImageNode.java
similarity index 57%
copy from shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java
copy to metadata/src/main/java/org/apache/kafka/image/node/ClientQuotaImageNode.java
index 1756ba76aa8..725c5620ed9 100644
--- a/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java
+++ b/metadata/src/main/java/org/apache/kafka/image/node/ClientQuotaImageNode.java
@@ -15,29 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.image.node;
 
-import java.io.PrintWriter;
-import java.util.Optional;
+import org.apache.kafka.image.ClientQuotaImage;
 
-/**
- * Does nothing.
- */
-public final class NoOpCommandHandler implements Commands.Handler {
-    @Override
-    public void run(Optional<InteractiveShell> shell,
-                    PrintWriter writer,
-                    MetadataNodeManager manager) {
+import java.util.Collection;
+
+
+public class ClientQuotaImageNode implements MetadataNode {
+    /**
+     * The client quota image.
+     */
+    private final ClientQuotaImage image;
+
+    public ClientQuotaImageNode(ClientQuotaImage image) {
+        this.image = image;
     }
 
     @Override
-    public int hashCode() {
-        return 0;
+    public Collection<String> childNames() {
+        return image.quotaMap().keySet();
     }
 
     @Override
-    public boolean equals(Object other) {
-        if (!(other instanceof NoOpCommandHandler)) return false;
-        return true;
+    public MetadataNode child(String name) {
+        Double result = image.quotaMap().get(name);
+        if (result == null) return null;
+        return new MetadataLeafNode(result + "");
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/node/ClientQuotasImageNode.java b/metadata/src/main/java/org/apache/kafka/image/node/ClientQuotasImageNode.java
new file mode 100644
index 00000000000..e13bbcf3d64
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/node/ClientQuotasImageNode.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.image.ClientQuotaImage;
+import org.apache.kafka.image.ClientQuotasImage;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.kafka.common.quota.ClientQuotaEntity.CLIENT_ID;
+import static org.apache.kafka.common.quota.ClientQuotaEntity.IP;
+import static org.apache.kafka.common.quota.ClientQuotaEntity.USER;
+
+
+public class ClientQuotasImageNode implements MetadataNode {
+    /**
+     * The name of this node.
+     */
+    public static final String NAME = "clientQuotas";
+
+    /**
+     * The topics image.
+     */
+    private final ClientQuotasImage image;
+
+    public ClientQuotasImageNode(ClientQuotasImage image) {
+        this.image = image;
+    }
+
+    @Override
+    public Collection<String> childNames() {
+        ArrayList<String> childNames = new ArrayList<>();
+        for (ClientQuotaEntity entity : image.entities().keySet()) {
+            childNames.add(clientQuotaEntityToString(entity));
+        }
+        return childNames;
+    }
+
+    static String clientQuotaEntityToString(ClientQuotaEntity entity) {
+        if (entity.entries().isEmpty()) {
+            throw new RuntimeException("Invalid empty entity");
+        }
+        String clientId = null;
+        String ip = null;
+        String user = null;
+        for (Map.Entry<String, String> entry : entity.entries().entrySet()) {
+            if (entry.getKey().equals(CLIENT_ID)) {
+                clientId = entry.getValue();
+            } else if (entry.getKey().equals(IP)) {
+                ip = entry.getValue();
+            } else if (entry.getKey().equals(USER)) {
+                user = entry.getValue();
+            } else {
+                throw new RuntimeException("Invalid entity type " + entry.getKey());
+            }
+        }
+        StringBuilder bld = new StringBuilder();
+        String prefix = "";
+        if (clientId != null) {
+            bld.append(prefix).append("clientId(").append(escape(clientId)).append(")");
+            prefix = "_";
+        }
+        if (ip != null) {
+            bld.append(prefix).append("ip(").append(escape(ip)).append(")");
+            prefix = "_";
+        }
+        if (user != null) {
+            bld.append(prefix).append("user(").append(escape(user)).append(")");
+            prefix = "_";
+        }
+        return bld.toString();
+    }
+
+    static String escape(String input) {
+        return input.replace("\\", "\\\\").replace("(", "\\(").replace(")", "\\)");
+    }
+
+    @Override
+    public MetadataNode child(String name) {
+        ClientQuotaEntity entity = decodeEntity(name);
+        if (entity == null) return null;
+        ClientQuotaImage clientQuotaImage = image.entities().get(entity);
+        if (clientQuotaImage == null) return null;
+        return new ClientQuotaImageNode(clientQuotaImage);
+    }
+
+    static ClientQuotaEntity decodeEntity(String input) {
+        Map<String, String> entries = new HashMap<>();
+        String type = null;
+        String value = "";
+        boolean escaping = false;
+        int i = 0;
+        while (true) {
+            if (i >= input.length()) return null;
+            if (type == null) {
+                if (input.substring(i).startsWith("clientId(")) {
+                    type = CLIENT_ID;
+                    i += "clientId(".length();
+                } else if (input.substring(i).startsWith("ip(")) {
+                    type = IP;
+                    i += "ip(".length();
+                } else if (input.substring(i).startsWith("user(")) {
+                    type = USER;
+                    i += "user(".length();
+                } else {
+                    return null;
+                }
+            } else {
+                char c = input.charAt(i++);
+                if (escaping) {
+                    value += c;
+                    escaping = false;
+                } else {
+                    switch (c) {
+                        case ')':
+                            entries.put(type, value);
+                            type = null;
+                            value = "";
+                            break;
+                        case '\\':
+                            escaping = true;
+                            break;
+                        default:
+                            value += c;
+                            break;
+                    }
+                }
+                if (type == null) {
+                    if (i >= input.length()) {
+                        return new ClientQuotaEntity(entries);
+                    } else if (input.charAt(i++) != '_') {
+                        return null;
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/node/ClusterImageNode.java b/metadata/src/main/java/org/apache/kafka/image/node/ClusterImageNode.java
new file mode 100644
index 00000000000..5788160a0ce
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/node/ClusterImageNode.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.metadata.BrokerRegistration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+
+public class ClusterImageNode implements MetadataNode {
+    /**
+     * The name of this node.
+     */
+    public final static String NAME = "cluster";
+
+    /**
+     * The cluster image.
+     */
+    private final ClusterImage image;
+
+    public ClusterImageNode(ClusterImage image) {
+        this.image = image;
+    }
+
+    @Override
+    public Collection<String> childNames() {
+        ArrayList<String> childNames = new ArrayList<>();
+        for (Integer brokerId : image.brokers().keySet()) {
+            childNames.add(brokerId.toString());
+        }
+        return childNames;
+    }
+
+    @Override
+    public MetadataNode child(String name) {
+        try {
+            Integer brokerId = Integer.valueOf(name);
+            BrokerRegistration registration = image.brokers().get(brokerId);
+            if (registration == null) return null;
+            return new MetadataLeafNode(registration.toString());
+        } catch (NumberFormatException e) {
+            return null;
+        }
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/node/ConfigurationImageNode.java b/metadata/src/main/java/org/apache/kafka/image/node/ConfigurationImageNode.java
new file mode 100644
index 00000000000..c7262f249cc
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/node/ConfigurationImageNode.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.image.ConfigurationImage;
+import org.apache.kafka.image.node.printer.MetadataNodePrinter;
+
+import java.util.Collection;
+
+
+public class ConfigurationImageNode implements MetadataNode {
+    /**
+     * The configuration image for a specific resource.
+     */
+    private final ConfigurationImage image;
+
+    public ConfigurationImageNode(ConfigurationImage image) {
+        this.image = image;
+    }
+
+    @Override
+    public Collection<String> childNames() {
+        return image.data().keySet();
+    }
+
+    @Override
+    public MetadataNode child(String name) {
+        String value = image.data().get(name);
+        if (value == null) return null;
+        return new MetadataNode() {
+            @Override
+            public boolean isDirectory() {
+                return false;
+            }
+
+            @Override
+            public void print(MetadataNodePrinter printer) {
+                if (printer.redactionCriteria().
+                        shouldRedactConfig(image.resource().type(), name)) {
+                    printer.output("[redacted]");
+                } else {
+                    printer.output(value);
+                }
+            }
+        };
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/node/ConfigurationsImageNode.java b/metadata/src/main/java/org/apache/kafka/image/node/ConfigurationsImageNode.java
new file mode 100644
index 00000000000..6d30c13609d
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/node/ConfigurationsImageNode.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.image.ConfigurationImage;
+import org.apache.kafka.image.ConfigurationsImage;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+
+public class ConfigurationsImageNode implements MetadataNode {
+    /**
+     * The name of this node.
+     */
+    public final static String NAME = "configs";
+
+    /**
+     * The configurations image.
+     */
+    private final ConfigurationsImage image;
+
+    public ConfigurationsImageNode(ConfigurationsImage image) {
+        this.image = image;
+    }
+
+    @Override
+    public Collection<String> childNames() {
+        ArrayList<String> childNames = new ArrayList<>();
+        for (ConfigResource configResource : image.resourceData().keySet()) {
+            if (configResource.isDefault()) {
+                childNames.add(configResource.type().name());
+            } else {
+                childNames.add(configResource.type().name() + ":" + configResource.name());
+            }
+        }
+        return childNames;
+    }
+
+    static ConfigResource resourceFromName(String name) {
+        for (ConfigResource.Type type : ConfigResource.Type.values()) {
+            if (name.startsWith(type.name())) {
+                String key = name.substring(type.name().length());
+                if (key.isEmpty()) {
+                    return new ConfigResource(type, "");
+                } else if (key.startsWith(":")) {
+                    return new ConfigResource(type, key.substring(1));
+                } else {
+                    return null;
+                }
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public MetadataNode child(String name) {
+        ConfigResource resource = resourceFromName(name);
+        if (resource == null) return null;
+        ConfigurationImage configurationImage = image.resourceData().get(resource);
+        if (configurationImage == null) return null;
+        return new ConfigurationImageNode(configurationImage);
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/node/FeaturesImageNode.java b/metadata/src/main/java/org/apache/kafka/image/node/FeaturesImageNode.java
new file mode 100644
index 00000000000..33b51bfd588
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/node/FeaturesImageNode.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.image.FeaturesImage;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+
+public class FeaturesImageNode implements MetadataNode {
+    /**
+     * The name of this node.
+     */
+    public final static String NAME = "features";
+
+    /**
+     * The name of the metadata version child node.
+     */
+    public final static String METADATA_VERSION = "metadataVersion";
+
+    /**
+     * The name of the zk migration state child node.
+     */
+    public final static String ZK_MIGRATION_STATE = "zkMigrationState";
+
+    /**
+     * The prefix to put before finalized feature children.
+     */
+    public final static String FINALIZED_PREFIX = "finalized_";
+
+    /**
+     * The features image.
+     */
+    private final FeaturesImage image;
+
+    public FeaturesImageNode(FeaturesImage image) {
+        this.image = image;
+    }
+
+    @Override
+    public Collection<String> childNames() {
+        ArrayList<String> childNames = new ArrayList<>();
+        childNames.add(METADATA_VERSION);
+        childNames.add(ZK_MIGRATION_STATE);
+        for (String featureName : image.finalizedVersions().keySet()) {
+            childNames.add(FINALIZED_PREFIX + featureName);
+        }
+        return childNames;
+    }
+
+    @Override
+    public MetadataNode child(String name) {
+        if (name.equals(METADATA_VERSION)) {
+            return new MetadataLeafNode(image.metadataVersion().toString());
+        } else if (name.equals(ZK_MIGRATION_STATE)) {
+            return new MetadataLeafNode(image.zkMigrationState().toString());
+        } else if (name.startsWith(FINALIZED_PREFIX)) {
+            String key = name.substring(FINALIZED_PREFIX.length());
+            return new MetadataLeafNode(
+                    image.finalizedVersions().getOrDefault(key, (short) 0).toString());
+        } else {
+            return null;
+        }
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/node/MetadataImageNode.java b/metadata/src/main/java/org/apache/kafka/image/node/MetadataImageNode.java
new file mode 100644
index 00000000000..df5f28f34e2
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/node/MetadataImageNode.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.image.MetadataImage;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+
+
+public class MetadataImageNode implements MetadataNode {
+    /**
+     * The name of this node.
+     */
+    public final static String NAME = "image";
+
+    /**
+     * The metadata image.
+     */
+    private final MetadataImage image;
+
+    private final static Map<String, Function<MetadataImage, MetadataNode>> CHILDREN;
+
+    static {
+        Map<String, Function<MetadataImage, MetadataNode>> children = new HashMap<>();
+        children.put(ProvenanceNode.NAME, image -> new ProvenanceNode(image.provenance()));
+        children.put(FeaturesImageNode.NAME, image -> new FeaturesImageNode(image.features()));
+        children.put(ClusterImageNode.NAME, image -> new ClusterImageNode(image.cluster()));
+        children.put(TopicsImageNode.NAME, image -> new TopicsImageNode(image.topics()));
+        children.put(ConfigurationsImageNode.NAME, image -> new ConfigurationsImageNode(image.configs()));
+        children.put(ClientQuotasImageNode.NAME, image -> new ClientQuotasImageNode(image.clientQuotas()));
+        children.put(ProducerIdsImageNode.NAME, image -> new ProducerIdsImageNode(image.producerIds()));
+        children.put(AclsImageNode.NAME, image -> new AclsImageByIdNode(image.acls()));
+        children.put(ScramImageNode.NAME, image -> new ScramImageNode(image.scram()));
+        CHILDREN = Collections.unmodifiableMap(children);
+    }
+
+    public MetadataImageNode(MetadataImage image) {
+        this.image = image;
+    }
+
+    public MetadataImage image() {
+        return image;
+    }
+
+    @Override
+    public Collection<String> childNames() {
+        return CHILDREN.keySet();
+    }
+
+    @Override
+    public MetadataNode child(String name) {
+        return CHILDREN.getOrDefault(name, __ -> null).apply(image);
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/NotFileException.java b/metadata/src/main/java/org/apache/kafka/image/node/MetadataLeafNode.java
similarity index 64%
rename from shell/src/main/java/org/apache/kafka/shell/NotFileException.java
rename to metadata/src/main/java/org/apache/kafka/image/node/MetadataLeafNode.java
index cbc2a832d67..3e55e670580 100644
--- a/shell/src/main/java/org/apache/kafka/shell/NotFileException.java
+++ b/metadata/src/main/java/org/apache/kafka/image/node/MetadataLeafNode.java
@@ -15,16 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.image.node;
 
-/**
- * An exception that is thrown when a non-file node is treated like a
- * file.
- */
-public class NotFileException extends RuntimeException {
-    private static final long serialVersionUID = 1L;
+import org.apache.kafka.image.node.printer.MetadataNodePrinter;
+
+
+public class MetadataLeafNode implements MetadataNode {
+    private final String string;
+
+    public MetadataLeafNode(String string) {
+        this.string = string;
+    }
+
+    @Override
+    public boolean isDirectory() {
+        return false;
+    }
 
-    public NotFileException() {
-        super();
+    @Override
+    public void print(MetadataNodePrinter printer) {
+        printer.output(string);
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/node/MetadataNode.java b/metadata/src/main/java/org/apache/kafka/image/node/MetadataNode.java
new file mode 100644
index 00000000000..7b7eba78789
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/node/MetadataNode.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.image.node.printer.MetadataNodePrinter;
+import org.apache.kafka.image.node.printer.NodeStringifier;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+
+public interface MetadataNode {
+    default boolean isDirectory() {
+        return true;
+    }
+
+    /**
+     * Get the names of the children of this node, if there are any.
+     */
+    default Collection<String> childNames() {
+        return Collections.emptyList();
+    }
+
+    /**
+     * Get the child associated with the given name, or null if there is none.
+     */
+    default MetadataNode child(String name) {
+        return null;
+    }
+
+    /**
+     * Print this node.
+     */
+    default void print(MetadataNodePrinter printer) {
+        ArrayList<String> names = new ArrayList<>(childNames());
+        names.sort(String::compareTo);
+        for (String name : names) {
+            printer.enterNode(name);
+            MetadataNode child = child(name);
+            child.print(printer);
+            printer.leaveNode();
+        }
+    }
+
+    /**
+     * Convert this node to a string using the default stringifier.
+     */
+    default String stringify() {
+        NodeStringifier stringifier = new NodeStringifier();
+        print(stringifier);
+        return stringifier.toString();
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java b/metadata/src/main/java/org/apache/kafka/image/node/ProducerIdsImageNode.java
similarity index 51%
copy from shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java
copy to metadata/src/main/java/org/apache/kafka/image/node/ProducerIdsImageNode.java
index 1756ba76aa8..91b9457b536 100644
--- a/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java
+++ b/metadata/src/main/java/org/apache/kafka/image/node/ProducerIdsImageNode.java
@@ -15,29 +15,40 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.image.node;
 
-import java.io.PrintWriter;
-import java.util.Optional;
+import org.apache.kafka.image.ProducerIdsImage;
 
-/**
- * Does nothing.
- */
-public final class NoOpCommandHandler implements Commands.Handler {
-    @Override
-    public void run(Optional<InteractiveShell> shell,
-                    PrintWriter writer,
-                    MetadataNodeManager manager) {
+import java.util.Collection;
+import java.util.Collections;
+
+
+public class ProducerIdsImageNode implements MetadataNode {
+    /**
+     * The name of this node.
+     */
+    public static final String NAME = "producerIds";
+
+    /**
+     * The producer IDs image.
+     */
+    private final ProducerIdsImage image;
+
+    public ProducerIdsImageNode(ProducerIdsImage image) {
+        this.image = image;
     }
 
     @Override
-    public int hashCode() {
-        return 0;
+    public Collection<String> childNames() {
+        return Collections.singletonList("nextProducerId");
     }
 
     @Override
-    public boolean equals(Object other) {
-        if (!(other instanceof NoOpCommandHandler)) return false;
-        return true;
+    public MetadataNode child(String name) {
+        if (name.equals("nextProducerId")) {
+            return new MetadataLeafNode(image.nextProducerId() + "");
+        } else {
+            return null;
+        }
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/node/ProvenanceNode.java b/metadata/src/main/java/org/apache/kafka/image/node/ProvenanceNode.java
new file mode 100644
index 00000000000..00b468fb90e
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/node/ProvenanceNode.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.node.printer.MetadataNodePrinter;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+
+
+public class ProvenanceNode implements MetadataNode {
+    /**
+     * The name of this node.
+     */
+    public final static String NAME = "provenance";
+
+    /**
+     * The metadata provenance.
+     */
+    private final MetadataProvenance provenance;
+
+    public ProvenanceNode(MetadataProvenance provenance) {
+        this.provenance = provenance;
+    }
+
+    @Override
+    public boolean isDirectory() {
+        return false;
+    }
+
+    @Override
+    public void print(MetadataNodePrinter printer) {
+        ZonedDateTime zonedDateTime =
+            Instant.ofEpochMilli(provenance.lastContainedLogTimeMs()).atZone(ZoneId.of("UTC"));
+        printer.output("offset " + provenance.lastContainedOffset() +
+                ", epoch " + provenance.lastContainedEpoch() +
+                ", time " + DateTimeFormatter.ISO_ZONED_DATE_TIME.format(zonedDateTime));
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/node/ScramCredentialDataNode.java b/metadata/src/main/java/org/apache/kafka/image/node/ScramCredentialDataNode.java
new file mode 100644
index 00000000000..2c3ca9faaf1
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/node/ScramCredentialDataNode.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.image.node.printer.MetadataNodePrinter;
+import org.apache.kafka.metadata.ScramCredentialData;
+
+
+public class ScramCredentialDataNode implements MetadataNode {
+    private final ScramCredentialData data;
+
+    public ScramCredentialDataNode(ScramCredentialData data) {
+        this.data = data;
+    }
+
+    @Override
+    public boolean isDirectory() {
+        return false;
+    }
+
+    private static void arrayToHex(byte[] array, StringBuilder bld) {
+        for (int i = 0; i < array.length; i++) {
+            bld.append(String.format("%02x", array[i] & 0xff));
+        }
+    }
+
+    @Override
+    public void print(MetadataNodePrinter printer) {
+        StringBuilder bld = new StringBuilder();
+        bld.append("ScramCredentialData");
+        bld.append("(salt=");
+        if (printer.redactionCriteria().shouldRedactScram()) {
+            bld.append("[redacted]");
+        } else {
+            arrayToHex(data.salt(), bld);
+        }
+        bld.append(", storedKey=");
+        if (printer.redactionCriteria().shouldRedactScram()) {
+            bld.append("[redacted]");
+        } else {
+            arrayToHex(data.storedKey(), bld);
+        }
+        bld.append(", serverKey=");
+        if (printer.redactionCriteria().shouldRedactScram()) {
+            bld.append("[redacted]");
+        } else {
+            arrayToHex(data.serverKey(), bld);
+        }
+        bld.append(", iterations=");
+        if (printer.redactionCriteria().shouldRedactScram()) {
+            bld.append("[redacted]");
+        } else {
+            bld.append(data.iterations());
+        }
+        bld.append(")");
+        printer.output(bld.toString());
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/node/ScramImageNode.java b/metadata/src/main/java/org/apache/kafka/image/node/ScramImageNode.java
new file mode 100644
index 00000000000..66b1fe692e2
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/node/ScramImageNode.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.clients.admin.ScramMechanism;
+import org.apache.kafka.image.ScramImage;
+import org.apache.kafka.metadata.ScramCredentialData;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+
+public class ScramImageNode implements MetadataNode {
+    /**
+     * The name of this node.
+     */
+    public final static String NAME = "scram";
+
+    /**
+     * The SCRAM image.
+     */
+    private final ScramImage image;
+
+    public ScramImageNode(ScramImage image) {
+        this.image = image;
+    }
+
+    @Override
+    public Collection<String> childNames() {
+        ArrayList<String> childNames = new ArrayList<>();
+        for (ScramMechanism mechanism : ScramMechanism.values()) {
+            if (!mechanism.equals(ScramMechanism.UNKNOWN)) {
+                childNames.add(mechanism.mechanismName());
+            }
+        }
+        return childNames;
+    }
+
+    @Override
+    public MetadataNode child(String name) {
+        ScramMechanism mechanism = ScramMechanism.fromMechanismName(name);
+        if (mechanism.equals(ScramMechanism.UNKNOWN)) return null;
+        Map<String, ScramCredentialData> userData = image.mechanisms().get(mechanism);
+        return new ScramMechanismNode(userData == null ? Collections.emptyMap() : userData);
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java b/metadata/src/main/java/org/apache/kafka/image/node/ScramMechanismNode.java
similarity index 54%
copy from shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java
copy to metadata/src/main/java/org/apache/kafka/image/node/ScramMechanismNode.java
index 1756ba76aa8..e72a54f4247 100644
--- a/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java
+++ b/metadata/src/main/java/org/apache/kafka/image/node/ScramMechanismNode.java
@@ -15,29 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.image.node;
 
-import java.io.PrintWriter;
-import java.util.Optional;
+import org.apache.kafka.metadata.ScramCredentialData;
 
-/**
- * Does nothing.
- */
-public final class NoOpCommandHandler implements Commands.Handler {
-    @Override
-    public void run(Optional<InteractiveShell> shell,
-                    PrintWriter writer,
-                    MetadataNodeManager manager) {
+import java.util.Collection;
+import java.util.Map;
+
+
+public class ScramMechanismNode implements MetadataNode {
+    /**
+     * The userEntries for this mechanism.
+     */
+    private final Map<String, ScramCredentialData> userEntries;
+
+    public ScramMechanismNode(Map<String, ScramCredentialData> userEntries) {
+        this.userEntries = userEntries;
     }
 
     @Override
-    public int hashCode() {
-        return 0;
+    public Collection<String> childNames() {
+        return userEntries.keySet();
     }
 
     @Override
-    public boolean equals(Object other) {
-        if (!(other instanceof NoOpCommandHandler)) return false;
-        return true;
+    public MetadataNode child(String name) {
+        ScramCredentialData data = userEntries.get(name);
+        if (data == null) return null;
+        return new ScramCredentialDataNode(data);
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/node/TopicImageNode.java b/metadata/src/main/java/org/apache/kafka/image/node/TopicImageNode.java
new file mode 100644
index 00000000000..97f184d2b76
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/node/TopicImageNode.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.metadata.PartitionRegistration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+
+public final class TopicImageNode implements MetadataNode {
+    /**
+     * The topic image.
+     */
+    private final TopicImage image;
+
+    public TopicImageNode(TopicImage image) {
+        this.image = image;
+    }
+
+    @Override
+    public Collection<String> childNames() {
+        ArrayList<String> childNames = new ArrayList<>();
+        childNames.add("name");
+        childNames.add("id");
+        for (Integer partitionId : image.partitions().keySet()) {
+            childNames.add(partitionId.toString());
+        }
+        return childNames;
+    }
+
+    @Override
+    public MetadataNode child(String name) {
+        if (name.equals("name")) {
+            return new MetadataLeafNode(image.name());
+        } else if (name.equals("id")) {
+            return new MetadataLeafNode(image.id().toString());
+        } else {
+            Integer partitionId;
+            try {
+                partitionId = Integer.parseInt(name);
+            } catch (NumberFormatException e) {
+                return null;
+            }
+            PartitionRegistration registration = image.partitions().get(partitionId);
+            if (registration == null) return null;
+            return new MetadataLeafNode(registration.toString());
+        }
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/node/TopicsImageByIdNode.java b/metadata/src/main/java/org/apache/kafka/image/node/TopicsImageByIdNode.java
new file mode 100644
index 00000000000..6ccc86a754d
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/node/TopicsImageByIdNode.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+
+public class TopicsImageByIdNode implements MetadataNode {
+    /**
+     * The name of this node.
+     */
+    public final static String NAME = "byId";
+
+    /**
+     * The topics image.
+     */
+    private final TopicsImage image;
+
+    public TopicsImageByIdNode(TopicsImage image) {
+        this.image = image;
+    }
+
+    @Override
+    public Collection<String> childNames() {
+        ArrayList<String> childNames = new ArrayList<>();
+        for (Uuid id : image.topicsById().keySet()) {
+            childNames.add(id.toString());
+        }
+        return childNames;
+    }
+
+    @Override
+    public MetadataNode child(String name) {
+        Uuid id;
+        try {
+            id = Uuid.fromString(name);
+        } catch (Exception e) {
+            return null;
+        }
+        TopicImage topicImage = image.topicsById().get(id);
+        if (topicImage == null) return null;
+        return new TopicImageNode(topicImage);
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java b/metadata/src/main/java/org/apache/kafka/image/node/TopicsImageByNameNode.java
similarity index 52%
copy from shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java
copy to metadata/src/main/java/org/apache/kafka/image/node/TopicsImageByNameNode.java
index 1756ba76aa8..1a1902b807d 100644
--- a/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java
+++ b/metadata/src/main/java/org/apache/kafka/image/node/TopicsImageByNameNode.java
@@ -15,29 +15,38 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.image.node;
 
-import java.io.PrintWriter;
-import java.util.Optional;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.image.TopicsImage;
 
-/**
- * Does nothing.
- */
-public final class NoOpCommandHandler implements Commands.Handler {
-    @Override
-    public void run(Optional<InteractiveShell> shell,
-                    PrintWriter writer,
-                    MetadataNodeManager manager) {
+import java.util.Collection;
+
+
+public class TopicsImageByNameNode implements MetadataNode {
+    /**
+     * The name of this node.
+     */
+    public final static String NAME = "byName";
+
+    /**
+     * The topics image.
+     */
+    private final TopicsImage image;
+
+    public TopicsImageByNameNode(TopicsImage image) {
+        this.image = image;
     }
 
     @Override
-    public int hashCode() {
-        return 0;
+    public Collection<String> childNames() {
+        return image.topicsByName().keySet();
     }
 
     @Override
-    public boolean equals(Object other) {
-        if (!(other instanceof NoOpCommandHandler)) return false;
-        return true;
+    public MetadataNode child(String name) {
+        TopicImage topicImage = image.topicsByName().get(name);
+        if (topicImage == null) return null;
+        return new TopicImageNode(topicImage);
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/node/TopicsImageNode.java b/metadata/src/main/java/org/apache/kafka/image/node/TopicsImageNode.java
new file mode 100644
index 00000000000..f8286e7bae2
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/node/TopicsImageNode.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.image.TopicsImage;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+
+public class TopicsImageNode implements MetadataNode {
+    /**
+     * The name of this node.
+     */
+    public final static String NAME = "topics";
+
+    /**
+     * The topics image.
+     */
+    private final TopicsImage image;
+
+    public TopicsImageNode(TopicsImage image) {
+        this.image = image;
+    }
+
+    @Override
+    public Collection<String> childNames() {
+        return Arrays.asList(TopicsImageByNameNode.NAME, TopicsImageByIdNode.NAME);
+    }
+
+    @Override
+    public MetadataNode child(String name) {
+        if (name.equals(TopicsImageByNameNode.NAME)) {
+            return new TopicsImageByNameNode(image);
+        } else if (name.equals(TopicsImageByIdNode.NAME)) {
+            return new TopicsImageByIdNode(image);
+        } else {
+            return null;
+        }
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/NotDirectoryException.java b/metadata/src/main/java/org/apache/kafka/image/node/printer/MetadataNodePrinter.java
similarity index 55%
rename from shell/src/main/java/org/apache/kafka/shell/NotDirectoryException.java
rename to metadata/src/main/java/org/apache/kafka/image/node/printer/MetadataNodePrinter.java
index 692534758e2..661205660f1 100644
--- a/shell/src/main/java/org/apache/kafka/shell/NotDirectoryException.java
+++ b/metadata/src/main/java/org/apache/kafka/image/node/printer/MetadataNodePrinter.java
@@ -15,16 +15,37 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.image.node.printer;
 
-/**
- * An exception that is thrown when a non-directory node is treated like a
- * directory.
- */
-public class NotDirectoryException extends RuntimeException {
-    private static final long serialVersionUID = 1L;
 
-    public NotDirectoryException() {
-        super();
+public interface MetadataNodePrinter extends AutoCloseable {
+    /**
+     * Find out the redaction criteria to use when printing.
+     *
+     * @return          The redaction criteria to use when printing.
+     */
+    MetadataNodeRedactionCriteria redactionCriteria();
+
+    /**
+     * Begin visiting a node.
+     *
+     * @param name      The node name.
+     */
+    void enterNode(String name);
+
+    /**
+     * Leave a node.
+     */
+    void leaveNode();
+
+    /**
+     * Print text.
+     */
+    void output(String text);
+
+    /**
+     * Close this printer.
+     */
+    default void close() {
     }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/image/node/printer/MetadataNodeRedactionCriteria.java b/metadata/src/main/java/org/apache/kafka/image/node/printer/MetadataNodeRedactionCriteria.java
new file mode 100644
index 00000000000..36b80ee09e5
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/node/printer/MetadataNodeRedactionCriteria.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node.printer;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.metadata.KafkaConfigSchema;
+
+
+public interface MetadataNodeRedactionCriteria {
+    /**
+     * Returns true if SCRAM data should be redacted.
+     */
+    boolean shouldRedactScram();
+
+    /**
+     * Returns true if a configuration should be redacted.
+     *
+     * @param type      The configuration type.
+     * @param key       The configuration key.
+     *
+     * @return          True if the configuration should be redacted.
+     */
+    boolean shouldRedactConfig(ConfigResource.Type type, String key);
+
+    class Strict implements MetadataNodeRedactionCriteria {
+        public static final Strict INSTANCE = new Strict();
+
+        @Override
+        public boolean shouldRedactScram() {
+            return true;
+        }
+
+        @Override
+        public boolean shouldRedactConfig(ConfigResource.Type type, String key) {
+            return true;
+        }
+    }
+
+    class Normal implements MetadataNodeRedactionCriteria {
+        private final KafkaConfigSchema configSchema;
+
+        public Normal(KafkaConfigSchema configSchema) {
+            this.configSchema = configSchema;
+        }
+
+        @Override
+        public boolean shouldRedactScram() {
+            return true;
+        }
+
+        @Override
+        public boolean shouldRedactConfig(ConfigResource.Type type, String key) {
+            return configSchema.isSensitive(type, key);
+        }
+    }
+
+    class Disabled implements MetadataNodeRedactionCriteria {
+        public static final Disabled INSTANCE = new Disabled();
+
+        @Override
+        public boolean shouldRedactScram() {
+            return false;
+        }
+
+        @Override
+        public boolean shouldRedactConfig(ConfigResource.Type type, String key) {
+            return false;
+        }
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/node/printer/NodeStringifier.java b/metadata/src/main/java/org/apache/kafka/image/node/printer/NodeStringifier.java
new file mode 100644
index 00000000000..073b6ad900d
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/node/printer/NodeStringifier.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node.printer;
+
+import java.util.ArrayDeque;
+
+public class NodeStringifier implements MetadataNodePrinter {
+    private final MetadataNodeRedactionCriteria redactionCriteria;
+    private final StringBuilder stringBuilder;
+    private final ArrayDeque<String> prefixes;
+
+    public NodeStringifier() {
+        this(MetadataNodeRedactionCriteria.Strict.INSTANCE);
+    }
+
+    public NodeStringifier(MetadataNodeRedactionCriteria redactionCriteria) {
+        this.redactionCriteria = redactionCriteria;
+        this.stringBuilder = new StringBuilder();
+        this.prefixes = new ArrayDeque<>();
+        prefixes.push("");
+    }
+
+    @Override
+    public MetadataNodeRedactionCriteria redactionCriteria() {
+        return redactionCriteria;
+    }
+
+    @Override
+    public void enterNode(String name) {
+        stringBuilder.append(prefixes.pop());
+        prefixes.push(", ");
+        stringBuilder.append(name).append("(");
+        prefixes.push("");
+    }
+
+    @Override
+    public void leaveNode() {
+        stringBuilder.append(")");
+        prefixes.pop();
+    }
+
+    @Override
+    public void output(String text) {
+        stringBuilder.append(prefixes.pop()).append(text);
+        prefixes.push(", ");
+    }
+
+    @Override
+    public String toString() {
+        return stringBuilder.toString();
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
index 99174e973df..24ee7e3186a 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java
@@ -73,7 +73,7 @@ public class KRaftMigrationZkWriter {
         handleConfigsSnapshot(image.configs());
         handleClientQuotasSnapshot(image.clientQuotas(), image.scram());
         operationConsumer.accept("Setting next producer ID", migrationState ->
-            migrationClient.writeProducerId(image.producerIds().highestSeenProducerId(), migrationState));
+            migrationClient.writeProducerId(image.producerIds().nextProducerId(), migrationState));
         handleAclsSnapshot(image.acls());
     }
 
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
index 81945d12337..19875741ec2 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
@@ -42,6 +42,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.OptionalInt;
+import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
 
 
@@ -58,6 +59,8 @@ public final class SnapshotFileReader implements AutoCloseable {
     private FileRecords fileRecords;
     private Iterator<FileChannelRecordBatch> batchIterator;
     private final MetadataRecordSerde serde = new MetadataRecordSerde();
+    private long lastOffset = -1L;
+    private volatile OptionalLong highWaterMark = OptionalLong.empty();
 
     public SnapshotFileReader(String snapshotPath, RaftClient.Listener<ApiMessageAndVersion> listener) {
         this.snapshotPath = snapshotPath;
@@ -98,6 +101,7 @@ public final class SnapshotFileReader implements AutoCloseable {
         } else {
             handleMetadataBatch(batch);
         }
+        lastOffset = batch.lastOffset();
         scheduleHandleNextBatch();
     }
 
@@ -116,6 +120,10 @@ public final class SnapshotFileReader implements AutoCloseable {
         });
     }
 
+    public OptionalLong highWaterMark() {
+        return highWaterMark;
+    }
+
     private void handleControlBatch(FileChannelRecordBatch batch) {
         for (Iterator<Record> iter = batch.iterator(); iter.hasNext(); ) {
             Record record = iter.next();
@@ -180,7 +188,9 @@ public final class SnapshotFileReader implements AutoCloseable {
     class ShutdownEvent implements EventQueue.Event {
         @Override
         public void run() throws Exception {
-            listener.beginShutdown();
+            // Expose the high water mark only once we've shut down.
+            highWaterMark = OptionalLong.of(lastOffset);
+
             if (fileRecords != null) {
                 fileRecords.close();
                 fileRecords = null;
diff --git a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
index 965311ce7ae..0f9a761972b 100644
--- a/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java
@@ -52,11 +52,11 @@ public class ConfigurationsImageTest {
         broker0Map.put("foo", "bar");
         broker0Map.put("baz", "quux");
         map1.put(new ConfigResource(BROKER, "0"),
-            new ConfigurationImage(broker0Map));
+            new ConfigurationImage(new ConfigResource(BROKER, "0"), broker0Map));
         Map<String, String> broker1Map = new HashMap<>();
         broker1Map.put("foobar", "foobaz");
         map1.put(new ConfigResource(BROKER, "1"),
-            new ConfigurationImage(broker1Map));
+            new ConfigurationImage(new ConfigResource(BROKER, "1"), broker1Map));
         IMAGE1 = new ConfigurationsImage(map1);
 
         DELTA1_RECORDS = new ArrayList<>();
@@ -74,12 +74,12 @@ public class ConfigurationsImageTest {
         Map<String, String> broker0Map2 = new HashMap<>();
         broker0Map2.put("baz", "quux");
         map2.put(new ConfigResource(BROKER, "0"),
-            new ConfigurationImage(broker0Map2));
+            new ConfigurationImage(new ConfigResource(BROKER, "0"), broker0Map2));
         Map<String, String> broker1Map2 = new HashMap<>();
         broker1Map2.put("foobar", "foobaz");
         broker1Map2.put("barfoo", "bazfoo");
         map2.put(new ConfigResource(BROKER, "1"),
-            new ConfigurationImage(broker1Map2));
+            new ConfigurationImage(new ConfigResource(BROKER, "1"), broker1Map2));
         IMAGE2 = new ConfigurationsImage(map2);
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/image/node/ClientQuotasImageNodeTest.java b/metadata/src/test/java/org/apache/kafka/image/node/ClientQuotasImageNodeTest.java
new file mode 100644
index 00000000000..7abf08fbb30
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/image/node/ClientQuotasImageNodeTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+@Timeout(value = 40)
+public class ClientQuotasImageNodeTest {
+    @Test
+    public void testEscapeEmptyString() {
+        assertEquals("", ClientQuotasImageNode.escape(""));
+    }
+
+    @Test
+    public void testEscapeNormalString() {
+        assertEquals("abracadabra", ClientQuotasImageNode.escape("abracadabra"));
+    }
+
+    @Test
+    public void testEscapeBackslashes() {
+        assertEquals("\\\\foo\\\\bar", ClientQuotasImageNode.escape("\\foo\\bar"));
+    }
+
+    @Test
+    public void testEscapeParentheses() {
+        assertEquals("\\(bob's name\\)", ClientQuotasImageNode.escape("(bob's name)"));
+    }
+
+    private void entityToStringRoundTrip(ClientQuotaEntity entity, String expected) {
+        String entityString = ClientQuotasImageNode.clientQuotaEntityToString(entity);
+        assertEquals(expected, entityString);
+        ClientQuotaEntity entity2 = ClientQuotasImageNode.decodeEntity(entityString);
+        assertEquals(entity, entity2);
+    }
+
+    @Test
+    public void clientIdEntityRoundTrip() {
+        entityToStringRoundTrip(new ClientQuotaEntity(singletonMap("client-id", "foo")),
+            "clientId(foo)");
+    }
+
+    @Test
+    public void defaultClientIdEntityRoundTrip() {
+        entityToStringRoundTrip(new ClientQuotaEntity(singletonMap("client-id", "")),
+            "clientId()");
+    }
+
+    @Test
+    public void userEntityRoundTrip() {
+        entityToStringRoundTrip(new ClientQuotaEntity(singletonMap("user", "my-user-name")),
+            "user(my-user-name)");
+    }
+
+    @Test
+    public void defaultUserEntityRoundTrip() {
+        entityToStringRoundTrip(new ClientQuotaEntity(singletonMap("user", "")),
+            "user()");
+    }
+
+    @Test
+    public void clientIdAndUserEntityRoundTrip() {
+        Map<String, String> entityMap = new HashMap<>();
+        entityMap.put("user", "bob");
+        entityMap.put("client-id", "reports12345");
+        entityToStringRoundTrip(new ClientQuotaEntity(entityMap),
+            "clientId(reports12345)_user(bob)");
+    }
+
+    @Test
+    public void ipEntityRoundTrip() {
+        entityToStringRoundTrip(new ClientQuotaEntity(singletonMap("ip", "127.0.0.1")),
+            "ip(127.0.0.1)");
+    }
+
+    @Test
+    public void defaultIpEntityRoundTrip() {
+        entityToStringRoundTrip(new ClientQuotaEntity(singletonMap("ip", "")),
+            "ip()");
+    }
+
+    @Test
+    public void testUserEntityWithBackslashesInNameRoundTrip() {
+        entityToStringRoundTrip(new ClientQuotaEntity(singletonMap("user", "foo\\bar")),
+            "user(foo\\\\bar)");
+    }
+
+    @Test
+    public void testClientIdEntityWithParentheses() {
+        entityToStringRoundTrip(new ClientQuotaEntity(singletonMap("client-id", "(this )one)")),
+                "clientId(\\(this \\)one\\))");
+    }
+
+    @Test
+    public void testErrorOnInvalidEmptyEntityName() {
+        assertEquals("Invalid empty entity",
+            assertThrows(RuntimeException.class, () -> ClientQuotasImageNode.
+                clientQuotaEntityToString(new ClientQuotaEntity(emptyMap()))).
+                    getMessage());
+    }
+
+    @Test
+    public void testErrorOnInvalidEntityType() {
+        assertEquals("Invalid entity type foobar",
+            assertThrows(RuntimeException.class, () -> ClientQuotasImageNode.
+                clientQuotaEntityToString(new ClientQuotaEntity(singletonMap("foobar", "baz")))).
+                    getMessage());
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/image/node/ConfigurationImageNodeTest.java b/metadata/src/test/java/org/apache/kafka/image/node/ConfigurationImageNodeTest.java
new file mode 100644
index 00000000000..d94587a03b5
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/image/node/ConfigurationImageNodeTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.image.ConfigurationImage;
+import org.apache.kafka.image.node.printer.MetadataNodeRedactionCriteria;
+import org.apache.kafka.image.node.printer.NodeStringifier;
+import org.apache.kafka.metadata.KafkaConfigSchema;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+
+@Timeout(value = 40)
+public class ConfigurationImageNodeTest {
+    private static final MetadataNodeRedactionCriteria NORMAL;
+
+    private static final ConfigurationImageNode NODE;
+
+    static {
+        KafkaConfigSchema schema = new KafkaConfigSchema(Collections.singletonMap(BROKER, new ConfigDef().
+            define("non.secret", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "baz").
+            define("also.non.secret", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "baz").
+            define("secret.config", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "baz")),
+                Collections.emptyMap());
+        NORMAL = new MetadataNodeRedactionCriteria.Normal(schema);
+
+        Map<String, String> configs = new HashMap<>();
+        configs.put("non.secret", "baaz");
+        configs.put("secret.config", "123");
+        ConfigurationImage image = new ConfigurationImage(new ConfigResource(BROKER, ""), configs);
+        NODE = new ConfigurationImageNode(image);
+    }
+
+    @Test
+    public void testNonSecretChild() {
+        NodeStringifier stringifier = new NodeStringifier(NORMAL);
+        NODE.child("non.secret").print(stringifier);
+        assertEquals("baaz", stringifier.toString());
+    }
+
+    @Test
+    public void testSecretChild() {
+        NodeStringifier stringifier = new NodeStringifier(NORMAL);
+        NODE.child("secret.config").print(stringifier);
+        assertEquals("[redacted]", stringifier.toString());
+    }
+
+    @Test
+    public void testUnknownChild() {
+        assertNull(NODE.child("also.non.secret"));
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/image/node/ConfigurationsImageNodeTest.java b/metadata/src/test/java/org/apache/kafka/image/node/ConfigurationsImageNodeTest.java
new file mode 100644
index 00000000000..68bb2d912b0
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/image/node/ConfigurationsImageNodeTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.image.ConfigurationImage;
+import org.apache.kafka.image.ConfigurationsImage;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+@Timeout(value = 40)
+public class ConfigurationsImageNodeTest {
+    private static final ConfigurationsImageNode NODE;
+
+    static {
+        Map<ConfigResource, ConfigurationImage> resourceMap = new HashMap<>();
+        for (ConfigResource resource : Arrays.asList(
+                new ConfigResource(BROKER, ""),
+                new ConfigResource(BROKER, "0"),
+                new ConfigResource(TOPIC, ""),
+                new ConfigResource(TOPIC, "foobar"),
+                new ConfigResource(TOPIC, ":colons:"),
+                new ConfigResource(TOPIC, "__internal"))) {
+            resourceMap.put(resource, new ConfigurationImage(resource,
+                    Collections.singletonMap("foo", "bar")));
+        }
+        ConfigurationsImage image = new ConfigurationsImage(resourceMap);
+        NODE = new ConfigurationsImageNode(image);
+    }
+
+    @Test
+    public void testNodeChildNames() {
+        List<String> childNames = new ArrayList<>(NODE.childNames());
+        childNames.sort(String::compareTo);
+        assertEquals(Arrays.asList(
+            "BROKER",
+            "BROKER:0",
+            "TOPIC",
+            "TOPIC::colons:",
+            "TOPIC:__internal",
+            "TOPIC:foobar"), childNames);
+    }
+
+    @Test
+    public void testNodeChildNameParsing() {
+        List<ConfigResource> childResources = NODE.childNames().stream().
+            sorted().
+            map(ConfigurationsImageNode::resourceFromName).
+            collect(Collectors.toList());
+        assertEquals(Arrays.asList(
+            new ConfigResource(BROKER, ""),
+            new ConfigResource(BROKER, "0"),
+            new ConfigResource(TOPIC, ""),
+            new ConfigResource(TOPIC, ":colons:"),
+            new ConfigResource(TOPIC, "__internal"),
+            new ConfigResource(TOPIC, "foobar")), childResources);
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/image/node/ScramCredentialDataNodeTest.java b/metadata/src/test/java/org/apache/kafka/image/node/ScramCredentialDataNodeTest.java
new file mode 100644
index 00000000000..b8f4cf78058
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/image/node/ScramCredentialDataNodeTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node;
+
+import org.apache.kafka.image.node.printer.MetadataNodeRedactionCriteria.Disabled;
+import org.apache.kafka.image.node.printer.NodeStringifier;
+import org.apache.kafka.metadata.ScramCredentialData;
+import org.apache.kafka.server.util.MockRandom;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+public class ScramCredentialDataNodeTest {
+    private static final ScramCredentialData DATA;
+
+    static {
+        MockRandom mockRandom = new MockRandom();
+        byte[] salt = new byte[16];
+        mockRandom.nextBytes(salt);
+        byte[] storedKey = new byte[16];
+        mockRandom.nextBytes(storedKey);
+        byte[] serverKey = new byte[16];
+        mockRandom.nextBytes(serverKey);
+        DATA = new ScramCredentialData(salt, storedKey, serverKey, 16);
+    }
+
+    @Test
+    public void testPrintRedacted() {
+        NodeStringifier stringifier = new NodeStringifier();
+        new ScramCredentialDataNode(DATA).print(stringifier);
+        assertEquals("ScramCredentialData(" +
+            "salt=[redacted], " +
+            "storedKey=[redacted], " +
+            "serverKey=[redacted], " +
+            "iterations=[redacted])", stringifier.toString());
+    }
+
+    @Test
+    public void testPrintUnredacted() {
+        NodeStringifier stringifier = new NodeStringifier(Disabled.INSTANCE);
+        new ScramCredentialDataNode(DATA).print(stringifier);
+        assertEquals("ScramCredentialData(" +
+            "salt=4f1d6ea31e58c5ad3aaeb3266f55cce6, " +
+            "storedKey=3cfa1c3421b512d1d1dfc3355138b4ad, " +
+            "serverKey=2d9781209073e8d03aee3cbc63a1d4ca, " +
+            "iterations=16)", stringifier.toString());
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/image/node/printer/MetadataNodeRedactionCriteriaTest.java b/metadata/src/test/java/org/apache/kafka/image/node/printer/MetadataNodeRedactionCriteriaTest.java
new file mode 100644
index 00000000000..8f870cd9ce4
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/image/node/printer/MetadataNodeRedactionCriteriaTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node.printer;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.metadata.KafkaConfigSchema;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+public class MetadataNodeRedactionCriteriaTest {
+    private static final MetadataNodeRedactionCriteria.Strict STRICT;
+
+    private static final MetadataNodeRedactionCriteria.Normal NORMAL;
+
+    private static final MetadataNodeRedactionCriteria.Disabled DISABLED;
+
+    static {
+        Map<ConfigResource.Type, ConfigDef> configs = new HashMap<>();
+        configs.put(BROKER, new ConfigDef().
+                define("non.secret", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "baz").
+                define("secret.config", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "baz"));
+        configs.put(TOPIC, new ConfigDef().
+                define("topic.secret.config", ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "abc"));
+        KafkaConfigSchema schema = new KafkaConfigSchema(configs, Collections.emptyMap());
+        STRICT = MetadataNodeRedactionCriteria.Strict.INSTANCE;
+        NORMAL = new MetadataNodeRedactionCriteria.Normal(schema);
+        DISABLED = MetadataNodeRedactionCriteria.Disabled.INSTANCE;
+    }
+
+    @Test
+    public void testStrictRedactsScram() {
+        assertTrue(STRICT.shouldRedactScram());
+    }
+
+    @Test
+    public void testNormalRedactsScram() {
+        assertTrue(NORMAL.shouldRedactScram());
+    }
+
+    @Test
+    public void testDisabledDoesNotRedactScram() {
+        assertFalse(DISABLED.shouldRedactScram());
+    }
+
+    @Test
+    public void testStrictRedactsNonSensitiveConfig() {
+        assertTrue(STRICT.shouldRedactConfig(BROKER, "non.secret"));
+    }
+
+    @Test
+    public void testNormalDoesNotRedactNonSensitiveConfig() {
+        assertFalse(NORMAL.shouldRedactConfig(BROKER, "non.secret"));
+    }
+
+    @Test
+    public void testDisabledDoesNotRedactNonSensitiveConfig() {
+        assertFalse(DISABLED.shouldRedactConfig(BROKER, "non.secret"));
+    }
+
+    @Test
+    public void testStrictRedactsSensitiveConfig() {
+        assertTrue(STRICT.shouldRedactConfig(BROKER, "secret.config"));
+    }
+
+    @Test
+    public void testNormalDoesRedactsSensitiveConfig() {
+        assertTrue(NORMAL.shouldRedactConfig(BROKER, "secret.config"));
+    }
+
+    @Test
+    public void testDisabledDoesNotRedactSensitiveConfig() {
+        assertFalse(DISABLED.shouldRedactConfig(BROKER, "secret.config"));
+    }
+
+    @Test
+    public void testStrictRedactsUnknownConfig() {
+        assertTrue(STRICT.shouldRedactConfig(BROKER, "unknown.config"));
+    }
+
+    @Test
+    public void testNormalDoesRedactsUnknownConfig() {
+        assertTrue(NORMAL.shouldRedactConfig(BROKER, "unknown.config"));
+    }
+
+    @Test
+    public void testDisabledDoesNotRedactUnknownConfig() {
+        assertFalse(DISABLED.shouldRedactConfig(BROKER, "unknown.config"));
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/image/node/printer/NodeStringifierTest.java b/metadata/src/test/java/org/apache/kafka/image/node/printer/NodeStringifierTest.java
new file mode 100644
index 00000000000..a5d5b6c334a
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/image/node/printer/NodeStringifierTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.node.printer;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+@Timeout(value = 40)
+public class NodeStringifierTest {
+    @Test
+    public void testOutput() {
+        NodeStringifier stringifier = new NodeStringifier();
+        stringifier.output("testing 123");
+        stringifier.output("again");
+        assertEquals("testing 123, again", stringifier.toString());
+    }
+
+    @Test
+    public void testEnterNode() {
+        NodeStringifier stringifier = new NodeStringifier();
+        stringifier.enterNode("foo");
+        stringifier.leaveNode();
+        assertEquals("foo()", stringifier.toString());
+    }
+
+    @Test
+    public void testNesting() {
+        NodeStringifier stringifier = new NodeStringifier();
+        stringifier.enterNode("foo");
+        stringifier.enterNode("bar");
+        stringifier.leaveNode();
+        stringifier.enterNode("baz");
+        stringifier.leaveNode();
+        stringifier.leaveNode();
+        assertEquals("foo(bar(), baz())", stringifier.toString());
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/InteractiveShell.java b/shell/src/main/java/org/apache/kafka/shell/InteractiveShell.java
index aa4d4ea56cf..b7fe6161934 100644
--- a/shell/src/main/java/org/apache/kafka/shell/InteractiveShell.java
+++ b/shell/src/main/java/org/apache/kafka/shell/InteractiveShell.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.shell;
 
+import org.apache.kafka.shell.command.CommandUtils;
+import org.apache.kafka.shell.command.Commands;
+import org.apache.kafka.shell.state.MetadataShellState;
 import org.jline.reader.Candidate;
 import org.jline.reader.Completer;
 import org.jline.reader.EndOfFileException;
@@ -41,14 +44,14 @@ import java.util.NoSuchElementException;
 import java.util.Optional;
 
 /**
- * The Kafka metadata shell.
+ * Handles running the Kafka metadata shell in interactive mode, where we accept input in real time.
  */
 public final class InteractiveShell implements AutoCloseable {
     static class MetadataShellCompleter implements Completer {
-        private final MetadataNodeManager nodeManager;
+        private final MetadataShellState state;
 
-        MetadataShellCompleter(MetadataNodeManager nodeManager) {
-            this.nodeManager = nodeManager;
+        MetadataShellCompleter(MetadataShellState state) {
+            this.state = state;
         }
 
         @Override
@@ -69,7 +72,7 @@ public final class InteractiveShell implements AutoCloseable {
                     return;
                 }
                 try {
-                    type.completeNext(nodeManager, nextWords, candidates);
+                    type.completeNext(state, nextWords, candidates);
                 } catch (Exception e) {
                     throw new RuntimeException(e);
                 }
@@ -77,22 +80,22 @@ public final class InteractiveShell implements AutoCloseable {
         }
     }
 
-    private final MetadataNodeManager nodeManager;
+    private final MetadataShellState state;
     private final Terminal terminal;
     private final Parser parser;
     private final History history;
     private final MetadataShellCompleter completer;
     private final LineReader reader;
 
-    public InteractiveShell(MetadataNodeManager nodeManager) throws IOException {
-        this.nodeManager = nodeManager;
+    public InteractiveShell(MetadataShellState state) throws IOException {
+        this.state = state;
         TerminalBuilder builder = TerminalBuilder.builder().
             system(true).
             nativeSignals(true);
         this.terminal = builder.build();
         this.parser = new DefaultParser();
         this.history = new DefaultHistory();
-        this.completer = new MetadataShellCompleter(nodeManager);
+        this.completer = new MetadataShellCompleter(state);
         this.reader = LineReaderBuilder.builder().
             terminal(terminal).
             parser(parser).
@@ -111,7 +114,7 @@ public final class InteractiveShell implements AutoCloseable {
                 reader.readLine(">> ");
                 ParsedLine parsedLine = reader.getParsedLine();
                 Commands.Handler handler = commands.parseCommand(parsedLine.words());
-                handler.run(Optional.of(this), terminal.writer(), nodeManager);
+                handler.run(Optional.of(this), terminal.writer(), state);
                 terminal.writer().flush();
             } catch (UserInterruptException eof) {
                 // Handle the user pressing control-C.
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNode.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNode.java
deleted file mode 100644
index ad0b3cbc0a0..00000000000
--- a/shell/src/main/java/org/apache/kafka/shell/MetadataNode.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.shell;
-
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-/**
- * A node in the metadata tool.
- */
-public interface MetadataNode {
-    class DirectoryNode implements MetadataNode {
-        private final TreeMap<String, MetadataNode> children = new TreeMap<>();
-
-        public DirectoryNode mkdirs(String... names) {
-            if (names.length == 0) {
-                throw new RuntimeException("Invalid zero-length path");
-            }
-            DirectoryNode node = this;
-            for (int i = 0; i < names.length; i++) {
-                MetadataNode nextNode = node.children.get(names[i]);
-                if (nextNode == null) {
-                    nextNode = new DirectoryNode();
-                    node.children.put(names[i], nextNode);
-                } else {
-                    if (!(nextNode instanceof DirectoryNode)) {
-                        throw new NotDirectoryException();
-                    }
-                }
-                node = (DirectoryNode) nextNode;
-            }
-            return node;
-        }
-
-        public void rmrf(String... names) {
-            if (names.length == 0) {
-                throw new RuntimeException("Invalid zero-length path");
-            }
-            DirectoryNode node = this;
-            for (int i = 0; i < names.length - 1; i++) {
-                MetadataNode nextNode = node.children.get(names[i]);
-                if (!(nextNode instanceof DirectoryNode)) {
-                    throw new RuntimeException("Unable to locate directory /" +
-                        String.join("/", names));
-                }
-                node = (DirectoryNode) nextNode;
-            }
-            node.children.remove(names[names.length - 1]);
-        }
-
-        public FileNode create(String name) {
-            MetadataNode node = children.get(name);
-            if (node == null) {
-                node = new FileNode();
-                children.put(name, node);
-            } else {
-                if (!(node instanceof FileNode)) {
-                    throw new NotFileException();
-                }
-            }
-            return (FileNode) node;
-        }
-
-        public MetadataNode child(String component) {
-            return children.get(component);
-        }
-
-        public NavigableMap<String, MetadataNode> children() {
-            return children;
-        }
-
-        public void addChild(String name, DirectoryNode child) {
-            children.put(name, child);
-        }
-
-        public DirectoryNode directory(String... names) {
-            if (names.length == 0) {
-                throw new RuntimeException("Invalid zero-length path");
-            }
-            DirectoryNode node = this;
-            for (int i = 0; i < names.length; i++) {
-                MetadataNode nextNode = node.children.get(names[i]);
-                if (!(nextNode instanceof DirectoryNode)) {
-                    throw new RuntimeException("Unable to locate directory /" +
-                        String.join("/", names));
-                }
-                node = (DirectoryNode) nextNode;
-            }
-            return node;
-        }
-
-        public FileNode file(String... names) {
-            if (names.length == 0) {
-                throw new RuntimeException("Invalid zero-length path");
-            }
-            DirectoryNode node = this;
-            for (int i = 0; i < names.length - 1; i++) {
-                MetadataNode nextNode = node.children.get(names[i]);
-                if (!(nextNode instanceof DirectoryNode)) {
-                    throw new RuntimeException("Unable to locate file /" +
-                        String.join("/", names));
-                }
-                node = (DirectoryNode) nextNode;
-            }
-            MetadataNode nextNode = node.child(names[names.length -  1]);
-            if (!(nextNode instanceof FileNode)) {
-                throw new RuntimeException("Unable to locate file /" +
-                    String.join("/", names));
-            }
-            return (FileNode) nextNode;
-        }
-    }
-
-    class FileNode implements MetadataNode {
-        private String contents;
-
-        void setContents(String contents) {
-            this.contents = contents;
-        }
-
-        String contents() {
-            return contents;
-        }
-    }
-}
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java b/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
deleted file mode 100644
index 583547e2ed3..00000000000
--- a/shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.shell;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
-import org.apache.kafka.common.config.ConfigResource;
-import org.apache.kafka.common.metadata.AccessControlEntryRecord;
-import org.apache.kafka.common.metadata.AccessControlEntryRecordJsonConverter;
-import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
-import org.apache.kafka.common.metadata.ClientQuotaRecord;
-import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
-import org.apache.kafka.common.metadata.ConfigRecord;
-import org.apache.kafka.common.metadata.FeatureLevelRecord;
-import org.apache.kafka.common.metadata.FeatureLevelRecordJsonConverter;
-import org.apache.kafka.common.metadata.FenceBrokerRecord;
-import org.apache.kafka.common.metadata.MetadataRecordType;
-import org.apache.kafka.common.metadata.PartitionChangeRecord;
-import org.apache.kafka.common.metadata.PartitionRecord;
-import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
-import org.apache.kafka.common.metadata.ProducerIdsRecord;
-import org.apache.kafka.common.metadata.RegisterBrokerRecord;
-import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
-import org.apache.kafka.common.metadata.RemoveTopicRecord;
-import org.apache.kafka.common.metadata.TopicRecord;
-import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
-import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
-import org.apache.kafka.common.protocol.ApiMessage;
-import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
-import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
-import org.apache.kafka.queue.EventQueue;
-import org.apache.kafka.queue.KafkaEventQueue;
-import org.apache.kafka.raft.Batch;
-import org.apache.kafka.raft.BatchReader;
-import org.apache.kafka.raft.LeaderAndEpoch;
-import org.apache.kafka.raft.RaftClient;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.shell.MetadataNode.DirectoryNode;
-import org.apache.kafka.shell.MetadataNode.FileNode;
-import org.apache.kafka.snapshot.SnapshotReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-
-import static org.apache.kafka.metadata.LeaderRecoveryState.NO_CHANGE;
-
-/**
- * Maintains the in-memory metadata for the metadata tool.
- */
-public final class MetadataNodeManager implements AutoCloseable {
-    private static final int NO_LEADER_CHANGE = -2;
-
-    private static final Logger log = LoggerFactory.getLogger(MetadataNodeManager.class);
-
-    public static class Data {
-        private final DirectoryNode root = new DirectoryNode();
-        private String workingDirectory = "/";
-
-        public DirectoryNode root() {
-            return root;
-        }
-
-        public String workingDirectory() {
-            return workingDirectory;
-        }
-
-        public void setWorkingDirectory(String workingDirectory) {
-            this.workingDirectory = workingDirectory;
-        }
-    }
-
-    class LogListener implements RaftClient.Listener<ApiMessageAndVersion> {
-        @Override
-        public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
-            try {
-                while (reader.hasNext()) {
-                    Batch<ApiMessageAndVersion> batch = reader.next();
-                    log.debug("handleCommits " + batch.records() + " at offset " + batch.lastOffset());
-                    DirectoryNode dir = data.root.mkdirs("metadataQuorum");
-                    dir.create("offset").setContents(String.valueOf(batch.lastOffset()));
-                    for (ApiMessageAndVersion messageAndVersion : batch.records()) {
-                        handleMessage(messageAndVersion.message());
-                    }
-                }
-            } finally {
-                reader.close();
-            }
-        }
-
-        @Override
-        public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
-            try {
-                while (reader.hasNext()) {
-                    Batch<ApiMessageAndVersion> batch = reader.next();
-                    for (ApiMessageAndVersion messageAndVersion : batch) {
-                        handleMessage(messageAndVersion.message());
-                    }
-                }
-            } finally {
-                reader.close();
-            }
-        }
-
-        @Override
-        public void handleLeaderChange(LeaderAndEpoch leader) {
-            appendEvent("handleNewLeader", () -> {
-                log.debug("handleNewLeader " + leader);
-                DirectoryNode dir = data.root.mkdirs("metadataQuorum");
-                dir.create("leader").setContents(leader.toString());
-            }, null);
-        }
-
-        @Override
-        public void beginShutdown() {
-            log.debug("Metadata log listener sent beginShutdown");
-        }
-    }
-
-    private final Data data = new Data();
-    private final LogListener logListener = new LogListener();
-    private final ObjectMapper objectMapper;
-    private final KafkaEventQueue queue;
-
-    public MetadataNodeManager() {
-        this.objectMapper = new ObjectMapper();
-        this.objectMapper.registerModule(new Jdk8Module());
-        this.queue = new KafkaEventQueue(Time.SYSTEM,
-            new LogContext("[node-manager-event-queue] "), "");
-    }
-
-    public void setup() throws Exception {
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        appendEvent("createShellNodes", () -> {
-            DirectoryNode directory = data.root().mkdirs("local");
-            directory.create("version").setContents(AppInfoParser.getVersion());
-            directory.create("commitId").setContents(AppInfoParser.getCommitId());
-            future.complete(null);
-        }, future);
-        future.get();
-    }
-
-    public LogListener logListener() {
-        return logListener;
-    }
-
-    // VisibleForTesting
-    Data getData() {
-        return data;
-    }
-
-    @Override
-    public void close() throws Exception {
-        queue.close();
-    }
-
-    public void visit(Consumer<Data> consumer) throws Exception {
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        appendEvent("visit", () -> {
-            consumer.accept(data);
-            future.complete(null);
-        }, future);
-        future.get();
-    }
-
-    private void appendEvent(String name, Runnable runnable, CompletableFuture<?> future) {
-        queue.append(new EventQueue.Event() {
-            @Override
-            public void run() throws Exception {
-                runnable.run();
-            }
-
-            @Override
-            public void handleException(Throwable e) {
-                log.error("Unexpected error while handling event " + name, e);
-                if (future != null) {
-                    future.completeExceptionally(e);
-                }
-            }
-        });
-    }
-
-    // VisibleForTesting
-    void handleMessage(ApiMessage message) {
-        try {
-            MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
-            handleCommitImpl(type, message);
-        } catch (Exception e) {
-            log.error("Error processing record of type " + message.apiKey(), e);
-        }
-    }
-
-    private void handleCommitImpl(MetadataRecordType type, ApiMessage message)
-        throws Exception {
-        switch (type) {
-            case REGISTER_BROKER_RECORD: {
-                DirectoryNode brokersNode = data.root.mkdirs("brokers");
-                RegisterBrokerRecord record = (RegisterBrokerRecord) message;
-                DirectoryNode brokerNode = brokersNode.
-                    mkdirs(Integer.toString(record.brokerId()));
-                FileNode registrationNode = brokerNode.create("registration");
-                registrationNode.setContents(record.toString());
-                brokerNode.create("isFenced").setContents("true");
-                break;
-            }
-            case UNREGISTER_BROKER_RECORD: {
-                UnregisterBrokerRecord record = (UnregisterBrokerRecord) message;
-                data.root.rmrf("brokers", Integer.toString(record.brokerId()));
-                break;
-            }
-            case TOPIC_RECORD: {
-                TopicRecord record = (TopicRecord) message;
-                DirectoryNode topicsDirectory = data.root.mkdirs("topics");
-                DirectoryNode topicDirectory = topicsDirectory.mkdirs(record.name());
-                topicDirectory.create("id").setContents(record.topicId().toString());
-                topicDirectory.create("name").setContents(record.name().toString());
-                DirectoryNode topicIdsDirectory = data.root.mkdirs("topicIds");
-                topicIdsDirectory.addChild(record.topicId().toString(), topicDirectory);
-                break;
-            }
-            case PARTITION_RECORD: {
-                PartitionRecord record = (PartitionRecord) message;
-                DirectoryNode topicDirectory =
-                    data.root.mkdirs("topicIds").mkdirs(record.topicId().toString());
-                DirectoryNode partitionDirectory =
-                    topicDirectory.mkdirs(Integer.toString(record.partitionId()));
-                JsonNode node = PartitionRecordJsonConverter.
-                    write(record, PartitionRecord.HIGHEST_SUPPORTED_VERSION);
-                partitionDirectory.create("data").setContents(node.toPrettyString());
-                break;
-            }
-            case CONFIG_RECORD: {
-                ConfigRecord record = (ConfigRecord) message;
-                String typeString = "";
-                switch (ConfigResource.Type.forId(record.resourceType())) {
-                    case BROKER:
-                        typeString = "broker";
-                        break;
-                    case TOPIC:
-                        typeString = "topic";
-                        break;
-                    default:
-                        throw new RuntimeException("Error processing CONFIG_RECORD: " +
-                            "Can't handle ConfigResource.Type " + record.resourceType());
-                }
-                DirectoryNode configDirectory = data.root.mkdirs("configs").
-                    mkdirs(typeString).mkdirs(record.resourceName().isEmpty() ? "<default>" : record.resourceName());
-                if (record.value() == null) {
-                    configDirectory.rmrf(record.name());
-                } else {
-                    configDirectory.create(record.name()).setContents(record.value());
-                }
-                break;
-            }
-            case PARTITION_CHANGE_RECORD: {
-                PartitionChangeRecord record = (PartitionChangeRecord) message;
-                FileNode file = data.root.file("topicIds", record.topicId().toString(),
-                    Integer.toString(record.partitionId()), "data");
-                JsonNode node = objectMapper.readTree(file.contents());
-                PartitionRecord partition = PartitionRecordJsonConverter.
-                    read(node, PartitionRecord.HIGHEST_SUPPORTED_VERSION);
-                if (record.isr() != null) {
-                    partition.setIsr(record.isr());
-                }
-                if (record.leader() != NO_LEADER_CHANGE) {
-                    partition.setLeader(record.leader());
-                    partition.setLeaderEpoch(partition.leaderEpoch() + 1);
-                }
-                if (record.leaderRecoveryState() != NO_CHANGE) {
-                    partition.setLeaderRecoveryState(record.leaderRecoveryState());
-                }
-                partition.setPartitionEpoch(partition.partitionEpoch() + 1);
-                file.setContents(PartitionRecordJsonConverter.write(partition,
-                    PartitionRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString());
-                break;
-            }
-            case FENCE_BROKER_RECORD: {
-                FenceBrokerRecord record = (FenceBrokerRecord) message;
-                data.root.mkdirs("brokers", Integer.toString(record.id())).
-                    create("isFenced").setContents("true");
-                break;
-            }
-            case UNFENCE_BROKER_RECORD: {
-                UnfenceBrokerRecord record = (UnfenceBrokerRecord) message;
-                data.root.mkdirs("brokers", Integer.toString(record.id())).
-                    create("isFenced").setContents("false");
-                break;
-            }
-            case BROKER_REGISTRATION_CHANGE_RECORD: {
-                BrokerRegistrationChangeRecord record = (BrokerRegistrationChangeRecord) message;
-                BrokerRegistrationFencingChange fencingChange =
-                    BrokerRegistrationFencingChange.fromValue(record.fenced()).get();
-                if (fencingChange != BrokerRegistrationFencingChange.NONE) {
-                    data.root.mkdirs("brokers", Integer.toString(record.brokerId()))
-                        .create("isFenced").setContents(Boolean.toString(fencingChange.asBoolean().get()));
-                }
-                BrokerRegistrationInControlledShutdownChange inControlledShutdownChange =
-                    BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).get();
-                if (inControlledShutdownChange != BrokerRegistrationInControlledShutdownChange.NONE) {
-                    data.root.mkdirs("brokers", Integer.toString(record.brokerId()))
-                        .create("inControlledShutdown").setContents(Boolean.toString(inControlledShutdownChange.asBoolean().get()));
-                }
-                break;
-            }
-            case REMOVE_TOPIC_RECORD: {
-                RemoveTopicRecord record = (RemoveTopicRecord) message;
-                DirectoryNode topicsDirectory =
-                    data.root.directory("topicIds", record.topicId().toString());
-                String name = topicsDirectory.file("name").contents();
-                data.root.rmrf("topics", name);
-                data.root.rmrf("topicIds", record.topicId().toString());
-                break;
-            }
-            case CLIENT_QUOTA_RECORD: {
-                ClientQuotaRecord record = (ClientQuotaRecord) message;
-                List<String> directories = clientQuotaRecordDirectories(record.entity());
-                DirectoryNode node = data.root;
-                for (String directory : directories) {
-                    node = node.mkdirs(directory);
-                }
-                if (record.remove())
-                    node.rmrf(record.key());
-                else
-                    node.create(record.key()).setContents(record.value() + "");
-                break;
-            }
-            case PRODUCER_IDS_RECORD: {
-                ProducerIdsRecord record = (ProducerIdsRecord) message;
-                DirectoryNode producerIds = data.root.mkdirs("producerIds");
-                producerIds.create("lastBlockBrokerId").setContents(record.brokerId() + "");
-                producerIds.create("lastBlockBrokerEpoch").setContents(record.brokerEpoch() + "");
-
-                producerIds.create("nextBlockStartId").setContents(record.nextProducerId() + "");
-                break;
-            }
-            case ACCESS_CONTROL_ENTRY_RECORD: {
-                AccessControlEntryRecord record = (AccessControlEntryRecord) message;
-                DirectoryNode acls = data.root.mkdirs("acl").mkdirs("id");
-                FileNode file = acls.create(record.id().toString());
-                file.setContents(AccessControlEntryRecordJsonConverter.write(record,
-                    AccessControlEntryRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString());
-                break;
-            }
-            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD: {
-                RemoveAccessControlEntryRecord record = (RemoveAccessControlEntryRecord) message;
-                DirectoryNode acls = data.root.mkdirs("acl").mkdirs("id");
-                acls.rmrf(record.id().toString());
-                break;
-            }
-            case FEATURE_LEVEL_RECORD: {
-                FeatureLevelRecord record = (FeatureLevelRecord) message;
-                DirectoryNode features = data.root.mkdirs("features");
-                if (record.featureLevel() == 0) {
-                    features.rmrf(record.name());
-                } else {
-                    FileNode file = features.create(record.name());
-                    file.setContents(FeatureLevelRecordJsonConverter.write(record,
-                        FeatureLevelRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString());
-                }
-                break;
-            }
-            case NO_OP_RECORD: {
-                break;
-            }
-            default:
-                throw new RuntimeException("Unhandled metadata record type");
-        }
-    }
-
-    static List<String> clientQuotaRecordDirectories(List<EntityData> entityData) {
-        List<String> result = new ArrayList<>();
-        result.add("client-quotas");
-        TreeMap<String, EntityData> entries = new TreeMap<>();
-        entityData.forEach(e -> entries.put(e.entityType(), e));
-        for (Map.Entry<String, EntityData> entry : entries.entrySet()) {
-            result.add(entry.getKey());
-            result.add(entry.getValue().entityName() == null ?
-                "<default>" : entry.getValue().entityName());
-        }
-        return result;
-    }
-}
diff --git a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
index d68e1923bc8..dd6a8180ecc 100644
--- a/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
+++ b/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
@@ -24,8 +24,14 @@ import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.Namespace;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.image.loader.MetadataLoader;
 import org.apache.kafka.metadata.util.SnapshotFileReader;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.apache.kafka.server.fault.LoggingFaultHandler;
+import org.apache.kafka.shell.command.Commands;
+import org.apache.kafka.shell.state.MetadataShellPublisher;
+import org.apache.kafka.shell.state.MetadataShellState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,78 +41,113 @@ import java.io.PrintWriter;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 
 /**
- * The Kafka metadata shell.
+ * The Kafka metadata shell entry point.
  */
 public final class MetadataShell {
     private static final Logger log = LoggerFactory.getLogger(MetadataShell.class);
 
     public static class Builder {
-        private String snapshotPath;
+        private KafkaRaftManager<ApiMessageAndVersion> raftManager = null;
+        private String snapshotPath = null;
+        private FaultHandler faultHandler = new LoggingFaultHandler("shell", () -> { });
+
+        public Builder setRaftManager(KafkaRaftManager<ApiMessageAndVersion> raftManager) {
+            this.raftManager = raftManager;
+            return this;
+        }
 
         public Builder setSnapshotPath(String snapshotPath) {
             this.snapshotPath = snapshotPath;
             return this;
         }
 
-        public MetadataShell build() throws Exception {
-            if (snapshotPath == null) {
-                throw new RuntimeException("You must supply the log path via --snapshot");
-            }
-            MetadataNodeManager nodeManager = null;
-            SnapshotFileReader reader = null;
-            try {
-                nodeManager = new MetadataNodeManager();
-                reader = new SnapshotFileReader(snapshotPath, nodeManager.logListener());
-                return new MetadataShell(null, reader, nodeManager);
-            } catch (Throwable e) {
-                log.error("Initialization error", e);
-                if (reader != null) {
-                    reader.close();
-                }
-                if (nodeManager != null) {
-                    nodeManager.close();
-                }
-                throw e;
-            }
+        public Builder setFaultHandler(FaultHandler faultHandler) {
+            this.faultHandler = faultHandler;
+            return this;
+        }
+
+        public MetadataShell build() {
+            return new MetadataShell(raftManager,
+                snapshotPath,
+                faultHandler);
         }
     }
 
+    private final MetadataShellState state;
+
     private final KafkaRaftManager<ApiMessageAndVersion> raftManager;
 
-    private final SnapshotFileReader snapshotFileReader;
+    private final String snapshotPath;
+
+    private final FaultHandler faultHandler;
 
-    private final MetadataNodeManager nodeManager;
+    private final MetadataShellPublisher publisher;
 
-    public MetadataShell(KafkaRaftManager<ApiMessageAndVersion> raftManager,
-                        SnapshotFileReader snapshotFileReader,
-                        MetadataNodeManager nodeManager) {
+    private SnapshotFileReader snapshotFileReader;
+
+    private MetadataLoader loader;
+
+    public MetadataShell(
+        KafkaRaftManager<ApiMessageAndVersion> raftManager,
+        String snapshotPath,
+        FaultHandler faultHandler
+    ) {
+        this.state = new MetadataShellState();
         this.raftManager = raftManager;
-        this.snapshotFileReader = snapshotFileReader;
-        this.nodeManager = nodeManager;
+        this.snapshotPath = snapshotPath;
+        this.faultHandler = faultHandler;
+        this.publisher = new MetadataShellPublisher(state);
+        this.snapshotFileReader = null;
+    }
+
+    private void initializeWithRaftManager() {
+        raftManager.startup();
+        this.loader = new MetadataLoader.Builder().
+                setFaultHandler(faultHandler).
+                setNodeId(-1).
+                setHighWaterMarkAccessor(() -> raftManager.client().highWatermark()).
+                build();
+        raftManager.register(loader);
+    }
+
+    private void initializeWithSnapshotFileReader() throws Exception {
+        this.loader = new MetadataLoader.Builder().
+                setFaultHandler(faultHandler).
+                setNodeId(-1).
+                setHighWaterMarkAccessor(() -> snapshotFileReader.highWaterMark()).
+                build();
+        snapshotFileReader = new SnapshotFileReader(snapshotPath, loader);
+        snapshotFileReader.startup();
     }
 
     public void run(List<String> args) throws Exception {
-        nodeManager.setup();
         if (raftManager != null) {
-            raftManager.startup();
-            raftManager.register(nodeManager.logListener());
-        } else if (snapshotFileReader != null) {
-            snapshotFileReader.startup();
+            if (snapshotPath != null) {
+                throw new RuntimeException("Can't specify both a raft manager and " +
+                        "snapshot file reader.");
+            }
+            initializeWithRaftManager();
+        } else if (snapshotPath != null) {
+            initializeWithSnapshotFileReader();
         } else {
-            throw new RuntimeException("Expected either a raft manager or snapshot reader");
+            throw new RuntimeException("You must specify either a raft manager or a " +
+                    "snapshot file reader.");
         }
+        loader.installPublishers(Collections.singletonList(publisher)).get(15, TimeUnit.MINUTES);
         if (args == null || args.isEmpty()) {
             // Interactive mode.
             System.out.println("Loading...");
             waitUntilCaughtUp();
             System.out.println("Starting...");
-            try (InteractiveShell shell = new InteractiveShell(nodeManager)) {
+            try (InteractiveShell shell = new InteractiveShell(state)) {
                 shell.runMainLoop();
             }
         } else {
@@ -116,20 +157,22 @@ public final class MetadataShell {
             try (PrintWriter writer = new PrintWriter(new BufferedWriter(
                     new OutputStreamWriter(System.out, StandardCharsets.UTF_8)))) {
                 Commands.Handler handler = commands.parseCommand(args);
-                handler.run(Optional.empty(), writer, nodeManager);
+                handler.run(Optional.empty(), writer, state);
                 writer.flush();
             }
         }
     }
 
     public void close() throws Exception {
+        Utils.closeQuietly(loader, "loader");
         if (raftManager != null) {
-            raftManager.shutdown();
-        }
-        if (snapshotFileReader != null) {
-            snapshotFileReader.close();
+            try {
+                raftManager.shutdown();
+            } catch (Exception e) {
+                log.error("Error shutting down RaftManager", e);
+            }
         }
-        nodeManager.close();
+        Utils.closeQuietly(snapshotFileReader, "raftManager");
     }
 
     public static void main(String[] args) throws Exception {
@@ -176,6 +219,12 @@ public final class MetadataShell {
     }
 
     void waitUntilCaughtUp() throws ExecutionException, InterruptedException {
-        snapshotFileReader.caughtUpFuture().get();
+        while (true) {
+            if (loader.lastAppliedOffset() > 0) {
+                return;
+            }
+            Thread.sleep(10);
+        }
+        //snapshotFileReader.caughtUpFuture().get();
     }
 }
diff --git a/shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/CatCommandHandler.java
similarity index 73%
copy from shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java
copy to shell/src/main/java/org/apache/kafka/shell/command/CatCommandHandler.java
index 3fc94279565..9cd7603f94c 100644
--- a/shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java
+++ b/shell/src/main/java/org/apache/kafka/shell/command/CatCommandHandler.java
@@ -15,12 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.shell.command;
 
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.Namespace;
-import org.apache.kafka.shell.MetadataNode.DirectoryNode;
-import org.apache.kafka.shell.MetadataNode.FileNode;
+import org.apache.kafka.image.node.MetadataNode;
+import org.apache.kafka.shell.InteractiveShell;
+import org.apache.kafka.shell.glob.GlobVisitor;
+import org.apache.kafka.shell.node.printer.ShellNodePrinter;
+import org.apache.kafka.shell.state.MetadataShellState;
 import org.jline.reader.Candidate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,7 +52,7 @@ public final class CatCommandHandler implements Commands.Handler {
 
         @Override
         public String description() {
-            return "Show the contents of metadata nodes.";
+            return "Show the contents of metadata files.";
         }
 
         @Override
@@ -61,7 +64,7 @@ public final class CatCommandHandler implements Commands.Handler {
         public void addArguments(ArgumentParser parser) {
             parser.addArgument("targets").
                 nargs("+").
-                help("The metadata nodes to display.");
+                help("The metadata files to display.");
         }
 
         @Override
@@ -70,10 +73,12 @@ public final class CatCommandHandler implements Commands.Handler {
         }
 
         @Override
-        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
-                                 List<Candidate> candidates) throws Exception {
-            CommandUtils.completePath(nodeManager, nextWords.get(nextWords.size() - 1),
-                candidates);
+        public void completeNext(
+            MetadataShellState state,
+            List<String> nextWords,
+            List<Candidate> candidates
+        ) throws Exception {
+            CommandUtils.completePath(state, nextWords.get(nextWords.size() - 1), candidates);
         }
     }
 
@@ -84,19 +89,21 @@ public final class CatCommandHandler implements Commands.Handler {
     }
 
     @Override
-    public void run(Optional<InteractiveShell> shell,
-                    PrintWriter writer,
-                    MetadataNodeManager manager) throws Exception {
+    public void run(
+        Optional<InteractiveShell> shell,
+        PrintWriter writer,
+        MetadataShellState state
+    ) throws Exception {
         log.trace("cat " + targets);
         for (String target : targets) {
-            manager.visit(new GlobVisitor(target, entryOption -> {
+            state.visit(new GlobVisitor(target, entryOption -> {
                 if (entryOption.isPresent()) {
                     MetadataNode node = entryOption.get().node();
-                    if (node instanceof DirectoryNode) {
+                    if (node.isDirectory()) {
                         writer.println("cat: " + target + ": Is a directory");
-                    } else if (node instanceof FileNode) {
-                        FileNode fileNode = (FileNode) node;
-                        writer.println(fileNode.contents());
+                    } else {
+                        ShellNodePrinter printer = new ShellNodePrinter(writer);
+                        node.print(printer);
                     }
                 } else {
                     writer.println("cat: " + target + ": No such file or directory.");
diff --git a/shell/src/main/java/org/apache/kafka/shell/CdCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/CdCommandHandler.java
similarity index 66%
rename from shell/src/main/java/org/apache/kafka/shell/CdCommandHandler.java
rename to shell/src/main/java/org/apache/kafka/shell/command/CdCommandHandler.java
index 8d270e54328..71057a4ade7 100644
--- a/shell/src/main/java/org/apache/kafka/shell/CdCommandHandler.java
+++ b/shell/src/main/java/org/apache/kafka/shell/command/CdCommandHandler.java
@@ -15,17 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.shell.command;
 
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.Namespace;
-import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.apache.kafka.shell.InteractiveShell;
+import org.apache.kafka.shell.glob.GlobVisitor;
+import org.apache.kafka.shell.state.MetadataShellState;
 import org.jline.reader.Candidate;
 
 import java.io.PrintWriter;
 import java.util.List;
 import java.util.Optional;
-import java.util.function.Consumer;
 
 /**
  * Implements the cd command.
@@ -65,10 +66,13 @@ public final class CdCommandHandler implements Commands.Handler {
         }
 
         @Override
-        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
-                                 List<Candidate> candidates) throws Exception {
+        public void completeNext(
+            MetadataShellState state,
+            List<String> nextWords,
+            List<Candidate> candidates
+        ) throws Exception {
             if (nextWords.size() == 1) {
-                CommandUtils.completePath(nodeManager, nextWords.get(0), candidates);
+                CommandUtils.completePath(state, nextWords.get(0), candidates);
             }
         }
     }
@@ -80,26 +84,23 @@ public final class CdCommandHandler implements Commands.Handler {
     }
 
     @Override
-    public void run(Optional<InteractiveShell> shell,
-                    PrintWriter writer,
-                    MetadataNodeManager manager) throws Exception {
+    public void run(
+        Optional<InteractiveShell> shell,
+        PrintWriter writer,
+        MetadataShellState state
+    ) throws Exception {
         String effectiveTarget = target.orElse("/");
-        manager.visit(new Consumer<MetadataNodeManager.Data>() {
-            @Override
-            public void accept(MetadataNodeManager.Data data) {
-                new GlobVisitor(effectiveTarget, entryOption -> {
-                    if (entryOption.isPresent()) {
-                        if (!(entryOption.get().node() instanceof DirectoryNode)) {
-                            writer.println("cd: " + effectiveTarget + ": not a directory.");
-                        } else {
-                            data.setWorkingDirectory(entryOption.get().absolutePath());
-                        }
-                    } else {
-                        writer.println("cd: " + effectiveTarget + ": no such directory.");
-                    }
-                }).accept(data);
+        new GlobVisitor(effectiveTarget, entryOption -> {
+            if (entryOption.isPresent()) {
+                if (!entryOption.get().node().isDirectory()) {
+                    writer.println("cd: " + effectiveTarget + ": not a directory.");
+                } else {
+                    state.setWorkingDirectory(entryOption.get().absolutePath());
+                }
+            } else {
+                writer.println("cd: " + effectiveTarget + ": no such directory.");
             }
-        });
+        }).accept(state);
     }
 
     @Override
diff --git a/shell/src/main/java/org/apache/kafka/shell/CommandUtils.java b/shell/src/main/java/org/apache/kafka/shell/command/CommandUtils.java
similarity index 83%
rename from shell/src/main/java/org/apache/kafka/shell/CommandUtils.java
rename to shell/src/main/java/org/apache/kafka/shell/command/CommandUtils.java
index 5febfb835a5..ae23284a7f7 100644
--- a/shell/src/main/java/org/apache/kafka/shell/CommandUtils.java
+++ b/shell/src/main/java/org/apache/kafka/shell/command/CommandUtils.java
@@ -15,14 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.shell.command;
 
-import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.apache.kafka.image.node.MetadataNode;
+import org.apache.kafka.shell.state.MetadataShellState;
 import org.jline.reader.Candidate;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map.Entry;
+import java.util.TreeSet;
 
 /**
  * Utility functions for command handlers.
@@ -97,33 +98,35 @@ public final class CommandUtils {
     /**
      * Generate a list of potential completions for a path.
      *
-     * @param nodeManager       The NodeManager.
+     * @param state             The MetadataShellState.
      * @param pathPrefix        The path prefix.  Non-null.
      * @param candidates        The list to add the output completions to.
      */
-    public static void completePath(MetadataNodeManager nodeManager,
-                                    String pathPrefix,
-                                    List<Candidate> candidates) throws Exception {
-        nodeManager.visit(data -> {
+    public static void completePath(
+        MetadataShellState state,
+        String pathPrefix,
+        List<Candidate> candidates
+    ) throws Exception {
+        state.visit(data -> {
             String absolutePath = pathPrefix.startsWith("/") ?
                 pathPrefix : data.workingDirectory() + "/" + pathPrefix;
             List<String> pathComponents = stripDotPathComponents(splitPath(absolutePath));
-            DirectoryNode directory = data.root();
+            MetadataNode directory = data.root();
             int numDirectories = pathPrefix.endsWith("/") ?
                 pathComponents.size() : pathComponents.size() - 1;
             for (int i = 0; i < numDirectories; i++) {
                 MetadataNode node = directory.child(pathComponents.get(i));
-                if (!(node instanceof DirectoryNode)) {
+                if (node == null || !node.isDirectory()) {
                     return;
                 }
-                directory = (DirectoryNode) node;
+                directory = node;
             }
             String lastComponent = "";
             if (numDirectories >= 0 && numDirectories < pathComponents.size()) {
                 lastComponent = pathComponents.get(numDirectories);
             }
-            Entry<String, MetadataNode> candidate =
-                directory.children().ceilingEntry(lastComponent);
+            TreeSet<String> children = new TreeSet<>(directory.childNames());
+            String candidate = children.ceiling(lastComponent);
             String effectivePrefix;
             int lastSlash = pathPrefix.lastIndexOf('/');
             if (lastSlash < 0) {
@@ -131,17 +134,18 @@ public final class CommandUtils {
             } else {
                 effectivePrefix = pathPrefix.substring(0, lastSlash + 1);
             }
-            while (candidate != null && candidate.getKey().startsWith(lastComponent)) {
+            while (candidate != null && candidate.startsWith(lastComponent)) {
                 StringBuilder candidateBuilder = new StringBuilder();
-                candidateBuilder.append(effectivePrefix).append(candidate.getKey());
+                candidateBuilder.append(effectivePrefix).append(candidate);
                 boolean complete = true;
-                if (candidate.getValue() instanceof DirectoryNode) {
+                MetadataNode child = directory.child(candidate);
+                if (child != null && child.isDirectory()) {
                     candidateBuilder.append("/");
                     complete = false;
                 }
                 candidates.add(new Candidate(candidateBuilder.toString(),
                     candidateBuilder.toString(), null, null, null, null, complete));
-                candidate = directory.children().higherEntry(candidate.getKey());
+                candidate = children.higher(candidate);
             }
         });
     }
diff --git a/shell/src/main/java/org/apache/kafka/shell/Commands.java b/shell/src/main/java/org/apache/kafka/shell/command/Commands.java
similarity index 83%
rename from shell/src/main/java/org/apache/kafka/shell/Commands.java
rename to shell/src/main/java/org/apache/kafka/shell/command/Commands.java
index db16411ebae..ddafb5e6920 100644
--- a/shell/src/main/java/org/apache/kafka/shell/Commands.java
+++ b/shell/src/main/java/org/apache/kafka/shell/command/Commands.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.shell.command;
 
 import net.sourceforge.argparse4j.ArgumentParsers;
 import net.sourceforge.argparse4j.inf.ArgumentParser;
@@ -24,6 +24,8 @@ import net.sourceforge.argparse4j.inf.Namespace;
 import net.sourceforge.argparse4j.inf.Subparser;
 import net.sourceforge.argparse4j.inf.Subparsers;
 import net.sourceforge.argparse4j.internal.HelpScreenException;
+import org.apache.kafka.shell.InteractiveShell;
+import org.apache.kafka.shell.state.MetadataShellState;
 import org.jline.reader.Candidate;
 
 import java.io.PrintWriter;
@@ -42,20 +44,22 @@ public final class Commands {
     /**
      * A map from command names to command types.
      */
-    static final NavigableMap<String, Type> TYPES;
+    public static final NavigableMap<String, Type> TYPES;
 
     static {
         TreeMap<String, Type> typesMap = new TreeMap<>();
         for (Type type : Arrays.asList(
-                CatCommandHandler.TYPE,
-                CdCommandHandler.TYPE,
-                ExitCommandHandler.TYPE,
-                FindCommandHandler.TYPE,
-                HelpCommandHandler.TYPE,
-                HistoryCommandHandler.TYPE,
-                LsCommandHandler.TYPE,
-                ManCommandHandler.TYPE,
-                PwdCommandHandler.TYPE)) {
+            CatCommandHandler.TYPE,
+            CdCommandHandler.TYPE,
+            ExitCommandHandler.TYPE,
+            FindCommandHandler.TYPE,
+            HelpCommandHandler.TYPE,
+            HistoryCommandHandler.TYPE,
+            LsCommandHandler.TYPE,
+            ManCommandHandler.TYPE,
+            PwdCommandHandler.TYPE,
+            TreeCommandHandler.TYPE
+        )) {
             typesMap.put(type.name(), type);
         }
         TYPES = Collections.unmodifiableNavigableMap(typesMap);
@@ -66,9 +70,11 @@ public final class Commands {
      * execute commands.
      */
     public interface Handler {
-        void run(Optional<InteractiveShell> shell,
-                 PrintWriter writer,
-                 MetadataNodeManager manager) throws Exception;
+        void run(
+            Optional<InteractiveShell> shell,
+            PrintWriter writer,
+            MetadataShellState state
+        ) throws Exception;
     }
 
     /**
@@ -82,9 +88,11 @@ public final class Commands {
         boolean shellOnly();
         void addArguments(ArgumentParser parser);
         Handler createHandler(Namespace namespace);
-        void completeNext(MetadataNodeManager nodeManager,
-                          List<String> nextWords,
-                          List<Candidate> candidates) throws Exception;
+        void completeNext(
+            MetadataShellState nodeManager,
+            List<String> nextWords,
+            List<Candidate> candidates
+        ) throws Exception;
     }
 
     private final ArgumentParser parser;
diff --git a/shell/src/main/java/org/apache/kafka/shell/ErroneousCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/ErroneousCommandHandler.java
similarity index 85%
rename from shell/src/main/java/org/apache/kafka/shell/ErroneousCommandHandler.java
rename to shell/src/main/java/org/apache/kafka/shell/command/ErroneousCommandHandler.java
index d52c55f9630..27cb02a906b 100644
--- a/shell/src/main/java/org/apache/kafka/shell/ErroneousCommandHandler.java
+++ b/shell/src/main/java/org/apache/kafka/shell/command/ErroneousCommandHandler.java
@@ -15,7 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.shell.command;
+
+import org.apache.kafka.shell.InteractiveShell;
+import org.apache.kafka.shell.state.MetadataShellState;
 
 import java.io.PrintWriter;
 import java.util.Objects;
@@ -32,9 +35,11 @@ public final class ErroneousCommandHandler implements Commands.Handler {
     }
 
     @Override
-    public void run(Optional<InteractiveShell> shell,
-                    PrintWriter writer,
-                    MetadataNodeManager manager) {
+    public void run(
+        Optional<InteractiveShell> shell,
+        PrintWriter writer,
+        MetadataShellState state
+    ) {
         writer.println(message);
     }
 
diff --git a/shell/src/main/java/org/apache/kafka/shell/ExitCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/ExitCommandHandler.java
similarity index 82%
rename from shell/src/main/java/org/apache/kafka/shell/ExitCommandHandler.java
rename to shell/src/main/java/org/apache/kafka/shell/command/ExitCommandHandler.java
index 2b11b352a8f..56f92de30f3 100644
--- a/shell/src/main/java/org/apache/kafka/shell/ExitCommandHandler.java
+++ b/shell/src/main/java/org/apache/kafka/shell/command/ExitCommandHandler.java
@@ -15,11 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.shell.command;
 
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.Namespace;
 import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.shell.InteractiveShell;
+import org.apache.kafka.shell.state.MetadataShellState;
 import org.jline.reader.Candidate;
 
 import java.io.PrintWriter;
@@ -62,16 +64,21 @@ public final class ExitCommandHandler implements Commands.Handler {
         }
 
         @Override
-        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
-                                 List<Candidate> candidates) throws Exception {
+        public void completeNext(
+            MetadataShellState nodeManager,
+            List<String> nextWords,
+            List<Candidate> candidates
+        ) throws Exception {
             // nothing to do
         }
     }
 
     @Override
-    public void run(Optional<InteractiveShell> shell,
-                    PrintWriter writer,
-                    MetadataNodeManager manager) {
+    public void run(
+        Optional<InteractiveShell> shell,
+        PrintWriter writer,
+        MetadataShellState state
+    ) {
         Exit.exit(0);
     }
 
diff --git a/shell/src/main/java/org/apache/kafka/shell/FindCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/FindCommandHandler.java
similarity index 71%
rename from shell/src/main/java/org/apache/kafka/shell/FindCommandHandler.java
rename to shell/src/main/java/org/apache/kafka/shell/command/FindCommandHandler.java
index 6d9ae44654b..8e11385b6ef 100644
--- a/shell/src/main/java/org/apache/kafka/shell/FindCommandHandler.java
+++ b/shell/src/main/java/org/apache/kafka/shell/command/FindCommandHandler.java
@@ -15,16 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.shell.command;
 
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.Namespace;
-import org.apache.kafka.shell.MetadataNode.DirectoryNode;
+import org.apache.kafka.image.node.MetadataNode;
+import org.apache.kafka.shell.InteractiveShell;
+import org.apache.kafka.shell.glob.GlobVisitor;
+import org.apache.kafka.shell.state.MetadataShellState;
 import org.jline.reader.Candidate;
 
 import java.io.PrintWriter;
 import java.util.List;
-import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 
@@ -66,10 +68,12 @@ public final class FindCommandHandler implements Commands.Handler {
         }
 
         @Override
-        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
-                                 List<Candidate> candidates) throws Exception {
-            CommandUtils.completePath(nodeManager, nextWords.get(nextWords.size() - 1),
-                candidates);
+        public void completeNext(
+            MetadataShellState state,
+            List<String> nextWords,
+            List<Candidate> candidates
+        ) throws Exception {
+            CommandUtils.completePath(state, nextWords.get(nextWords.size() - 1), candidates);
         }
     }
 
@@ -80,28 +84,33 @@ public final class FindCommandHandler implements Commands.Handler {
     }
 
     @Override
-    public void run(Optional<InteractiveShell> shell,
-                    PrintWriter writer,
-                    MetadataNodeManager manager) throws Exception {
+    public void run(
+        Optional<InteractiveShell> shell,
+        PrintWriter writer,
+        MetadataShellState state
+    ) throws Exception {
         for (String path : CommandUtils.getEffectivePaths(paths)) {
-            manager.visit(new GlobVisitor(path, entryOption -> {
+            new GlobVisitor(path, entryOption -> {
                 if (entryOption.isPresent()) {
                     find(writer, path, entryOption.get().node());
                 } else {
                     writer.println("find: " + path + ": no such file or directory.");
                 }
-            }));
+            }).accept(state);
         }
     }
 
     private void find(PrintWriter writer, String path, MetadataNode node) {
         writer.println(path);
-        if (node instanceof DirectoryNode) {
-            DirectoryNode directory = (DirectoryNode) node;
-            for (Entry<String, MetadataNode> entry : directory.children().entrySet()) {
-                String nextPath = path.equals("/") ?
-                    path + entry.getKey() : path + "/" + entry.getKey();
-                find(writer, nextPath, entry.getValue());
+        if (node.isDirectory()) {
+            for (String name : node.childNames()) {
+                String nextPath = path.equals("/") ? path + name : path + "/" + name;
+                MetadataNode child = node.child(name);
+                if (child == null) {
+                    throw new RuntimeException("Expected " + name + " to be a valid child of " +
+                            path + ", but it was not.");
+                }
+                find(writer, nextPath, child);
             }
         }
     }
diff --git a/shell/src/main/java/org/apache/kafka/shell/HelpCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/HelpCommandHandler.java
similarity index 83%
rename from shell/src/main/java/org/apache/kafka/shell/HelpCommandHandler.java
rename to shell/src/main/java/org/apache/kafka/shell/command/HelpCommandHandler.java
index 829274eefcc..52345487b44 100644
--- a/shell/src/main/java/org/apache/kafka/shell/HelpCommandHandler.java
+++ b/shell/src/main/java/org/apache/kafka/shell/command/HelpCommandHandler.java
@@ -15,10 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.shell.command;
 
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.shell.InteractiveShell;
+import org.apache.kafka.shell.state.MetadataShellState;
 import org.jline.reader.Candidate;
 
 import java.io.PrintWriter;
@@ -61,16 +63,21 @@ public final class HelpCommandHandler implements Commands.Handler {
         }
 
         @Override
-        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
-                                 List<Candidate> candidates) throws Exception {
+        public void completeNext(
+            MetadataShellState state,
+            List<String> nextWords,
+            List<Candidate> candidates
+        ) throws Exception {
             // nothing to do
         }
     }
 
     @Override
-    public void run(Optional<InteractiveShell> shell,
-                    PrintWriter writer,
-                    MetadataNodeManager manager) {
+    public void run(
+        Optional<InteractiveShell> shell,
+        PrintWriter writer,
+        MetadataShellState state
+    ) {
         writer.printf("Welcome to the Apache Kafka metadata shell.%n%n");
         new Commands(true).parser().printHelp(writer);
     }
diff --git a/shell/src/main/java/org/apache/kafka/shell/HistoryCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/HistoryCommandHandler.java
similarity index 87%
rename from shell/src/main/java/org/apache/kafka/shell/HistoryCommandHandler.java
rename to shell/src/main/java/org/apache/kafka/shell/command/HistoryCommandHandler.java
index edf9def4c87..c3b299983ce 100644
--- a/shell/src/main/java/org/apache/kafka/shell/HistoryCommandHandler.java
+++ b/shell/src/main/java/org/apache/kafka/shell/command/HistoryCommandHandler.java
@@ -15,10 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.shell.command;
 
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.shell.InteractiveShell;
+import org.apache.kafka.shell.state.MetadataShellState;
 import org.jline.reader.Candidate;
 
 import java.io.PrintWriter;
@@ -68,8 +70,11 @@ public final class HistoryCommandHandler implements Commands.Handler {
         }
 
         @Override
-        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
-                                 List<Candidate> candidates) throws Exception {
+        public void completeNext(
+            MetadataShellState state,
+            List<String> nextWords,
+            List<Candidate> candidates
+        ) throws Exception {
             // nothing to do
         }
     }
@@ -81,9 +86,11 @@ public final class HistoryCommandHandler implements Commands.Handler {
     }
 
     @Override
-    public void run(Optional<InteractiveShell> shell,
-                    PrintWriter writer,
-                    MetadataNodeManager manager) throws Exception {
+    public void run(
+        Optional<InteractiveShell> shell,
+        PrintWriter writer,
+        MetadataShellState state
+    ) throws Exception {
         if (!shell.isPresent()) {
             throw new RuntimeException("The history command requires a shell.");
         }
diff --git a/shell/src/main/java/org/apache/kafka/shell/LsCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/LsCommandHandler.java
similarity index 91%
rename from shell/src/main/java/org/apache/kafka/shell/LsCommandHandler.java
rename to shell/src/main/java/org/apache/kafka/shell/command/LsCommandHandler.java
index 629be25f63c..728fe7282f0 100644
--- a/shell/src/main/java/org/apache/kafka/shell/LsCommandHandler.java
+++ b/shell/src/main/java/org/apache/kafka/shell/command/LsCommandHandler.java
@@ -15,13 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.shell.command;
 
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.Namespace;
-import org.apache.kafka.shell.GlobVisitor.MetadataNodeInfo;
-import org.apache.kafka.shell.MetadataNode.DirectoryNode;
-import org.apache.kafka.shell.MetadataNode.FileNode;
+import org.apache.kafka.image.node.MetadataNode;
+import org.apache.kafka.shell.InteractiveShell;
+import org.apache.kafka.shell.glob.GlobVisitor;
+import org.apache.kafka.shell.glob.GlobVisitor.MetadataNodeInfo;
+import org.apache.kafka.shell.state.MetadataShellState;
 import org.jline.reader.Candidate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,10 +76,12 @@ public final class LsCommandHandler implements Commands.Handler {
         }
 
         @Override
-        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
-                                 List<Candidate> candidates) throws Exception {
-            CommandUtils.completePath(nodeManager, nextWords.get(nextWords.size() - 1),
-                candidates);
+        public void completeNext(
+            MetadataShellState state,
+            List<String> nextWords,
+            List<Candidate> candidates
+        ) throws Exception {
+            CommandUtils.completePath(state, nextWords.get(nextWords.size() - 1), candidates);
         }
     }
 
@@ -98,23 +102,24 @@ public final class LsCommandHandler implements Commands.Handler {
     }
 
     @Override
-    public void run(Optional<InteractiveShell> shell,
-                    PrintWriter writer,
-                    MetadataNodeManager manager) throws Exception {
+    public void run(
+        Optional<InteractiveShell> shell,
+        PrintWriter writer,
+        MetadataShellState state
+    ) throws Exception {
         List<String> targetFiles = new ArrayList<>();
         List<TargetDirectory> targetDirectories = new ArrayList<>();
         for (String target : CommandUtils.getEffectivePaths(targets)) {
-            manager.visit(new GlobVisitor(target, entryOption -> {
+            state.visit(new GlobVisitor(target, entryOption -> {
                 if (entryOption.isPresent()) {
                     MetadataNodeInfo info = entryOption.get();
                     MetadataNode node = info.node();
-                    if (node instanceof DirectoryNode) {
-                        DirectoryNode directory = (DirectoryNode) node;
+                    if (node.isDirectory()) {
                         List<String> children = new ArrayList<>();
-                        children.addAll(directory.children().keySet());
+                        children.addAll(node.childNames());
                         targetDirectories.add(
                             new TargetDirectory(info.lastPathComponent(), children));
-                    } else if (node instanceof FileNode) {
+                    } else {
                         targetFiles.add(info.lastPathComponent());
                     }
                 } else {
diff --git a/shell/src/main/java/org/apache/kafka/shell/ManCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/ManCommandHandler.java
similarity index 87%
rename from shell/src/main/java/org/apache/kafka/shell/ManCommandHandler.java
rename to shell/src/main/java/org/apache/kafka/shell/command/ManCommandHandler.java
index dcd0b8cd716..f10e89b2bff 100644
--- a/shell/src/main/java/org/apache/kafka/shell/ManCommandHandler.java
+++ b/shell/src/main/java/org/apache/kafka/shell/command/ManCommandHandler.java
@@ -15,11 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.shell.command;
 
 import net.sourceforge.argparse4j.ArgumentParsers;
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.shell.InteractiveShell;
+import org.apache.kafka.shell.state.MetadataShellState;
 import org.jline.reader.Candidate;
 
 import java.io.PrintWriter;
@@ -66,8 +68,11 @@ public final class ManCommandHandler implements Commands.Handler {
         }
 
         @Override
-        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
-                                 List<Candidate> candidates) throws Exception {
+        public void completeNext(
+            MetadataShellState state,
+            List<String> nextWords,
+            List<Candidate> candidates
+        ) throws Exception {
             if (nextWords.size() == 1) {
                 CommandUtils.completeCommand(nextWords.get(0), candidates);
             }
@@ -79,9 +84,11 @@ public final class ManCommandHandler implements Commands.Handler {
     }
 
     @Override
-    public void run(Optional<InteractiveShell> shell,
-                    PrintWriter writer,
-                    MetadataNodeManager manager) {
+    public void run(
+        Optional<InteractiveShell> shell,
+        PrintWriter writer,
+        MetadataShellState manager
+    ) {
         Commands.Type type = Commands.TYPES.get(cmd);
         if (type == null) {
             writer.println("man: unknown command " + cmd +
diff --git a/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/NoOpCommandHandler.java
similarity index 81%
rename from shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java
rename to shell/src/main/java/org/apache/kafka/shell/command/NoOpCommandHandler.java
index 1756ba76aa8..106d2ddb014 100644
--- a/shell/src/main/java/org/apache/kafka/shell/NoOpCommandHandler.java
+++ b/shell/src/main/java/org/apache/kafka/shell/command/NoOpCommandHandler.java
@@ -15,7 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.shell.command;
+
+import org.apache.kafka.shell.InteractiveShell;
+import org.apache.kafka.shell.state.MetadataShellState;
 
 import java.io.PrintWriter;
 import java.util.Optional;
@@ -25,9 +28,11 @@ import java.util.Optional;
  */
 public final class NoOpCommandHandler implements Commands.Handler {
     @Override
-    public void run(Optional<InteractiveShell> shell,
-                    PrintWriter writer,
-                    MetadataNodeManager manager) {
+    public void run(
+        Optional<InteractiveShell> shell,
+        PrintWriter writer,
+        MetadataShellState state
+    ) {
     }
 
     @Override
diff --git a/shell/src/main/java/org/apache/kafka/shell/PwdCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/PwdCommandHandler.java
similarity index 80%
rename from shell/src/main/java/org/apache/kafka/shell/PwdCommandHandler.java
rename to shell/src/main/java/org/apache/kafka/shell/command/PwdCommandHandler.java
index 1e5b5da39ef..4a0752a4e70 100644
--- a/shell/src/main/java/org/apache/kafka/shell/PwdCommandHandler.java
+++ b/shell/src/main/java/org/apache/kafka/shell/command/PwdCommandHandler.java
@@ -15,10 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.shell.command;
 
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.shell.InteractiveShell;
+import org.apache.kafka.shell.state.MetadataShellState;
 import org.jline.reader.Candidate;
 
 import java.io.PrintWriter;
@@ -61,19 +63,22 @@ public final class PwdCommandHandler implements Commands.Handler {
         }
 
         @Override
-        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
-                                 List<Candidate> candidates) throws Exception {
+        public void completeNext(
+            MetadataShellState state,
+            List<String> nextWords,
+            List<Candidate> candidates
+        ) throws Exception {
             // nothing to do
         }
     }
 
     @Override
-    public void run(Optional<InteractiveShell> shell,
-                    PrintWriter writer,
-                    MetadataNodeManager manager) throws Exception {
-        manager.visit(data -> {
-            writer.println(data.workingDirectory());
-        });
+    public void run(
+        Optional<InteractiveShell> shell,
+        PrintWriter writer,
+        MetadataShellState state
+    ) throws Exception {
+        writer.println(state.workingDirectory());
     }
 
     @Override
diff --git a/shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java b/shell/src/main/java/org/apache/kafka/shell/command/TreeCommandHandler.java
similarity index 59%
rename from shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java
rename to shell/src/main/java/org/apache/kafka/shell/command/TreeCommandHandler.java
index 3fc94279565..1489e1f1500 100644
--- a/shell/src/main/java/org/apache/kafka/shell/CatCommandHandler.java
+++ b/shell/src/main/java/org/apache/kafka/shell/command/TreeCommandHandler.java
@@ -15,12 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.shell.command;
 
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.Namespace;
-import org.apache.kafka.shell.MetadataNode.DirectoryNode;
-import org.apache.kafka.shell.MetadataNode.FileNode;
+import org.apache.kafka.image.node.MetadataNode;
+import org.apache.kafka.shell.InteractiveShell;
+import org.apache.kafka.shell.glob.GlobVisitor;
+import org.apache.kafka.shell.node.printer.ShellNodePrinter;
+import org.apache.kafka.shell.state.MetadataShellState;
 import org.jline.reader.Candidate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,10 +34,10 @@ import java.util.Objects;
 import java.util.Optional;
 
 /**
- * Implements the cat command.
+ * Implements the tree command.
  */
-public final class CatCommandHandler implements Commands.Handler {
-    private static final Logger log = LoggerFactory.getLogger(CatCommandHandler.class);
+public final class TreeCommandHandler implements Commands.Handler {
+    private static final Logger log = LoggerFactory.getLogger(TreeCommandHandler.class);
 
     public final static Commands.Type TYPE = new CatCommandType();
 
@@ -44,12 +47,12 @@ public final class CatCommandHandler implements Commands.Handler {
 
         @Override
         public String name() {
-            return "cat";
+            return "tree";
         }
 
         @Override
         public String description() {
-            return "Show the contents of metadata nodes.";
+            return "Show the contents of metadata nodes in a tree structure.";
         }
 
         @Override
@@ -66,40 +69,40 @@ public final class CatCommandHandler implements Commands.Handler {
 
         @Override
         public Commands.Handler createHandler(Namespace namespace) {
-            return new CatCommandHandler(namespace.getList("targets"));
+            return new TreeCommandHandler(namespace.getList("targets"));
         }
 
         @Override
-        public void completeNext(MetadataNodeManager nodeManager, List<String> nextWords,
-                                 List<Candidate> candidates) throws Exception {
-            CommandUtils.completePath(nodeManager, nextWords.get(nextWords.size() - 1),
-                candidates);
+        public void completeNext(
+            MetadataShellState state,
+            List<String> nextWords,
+            List<Candidate> candidates
+        ) throws Exception {
+            CommandUtils.completePath(state, nextWords.get(nextWords.size() - 1), candidates);
         }
     }
 
     private final List<String> targets;
 
-    public CatCommandHandler(List<String> targets) {
+    public TreeCommandHandler(List<String> targets) {
         this.targets = targets;
     }
 
     @Override
-    public void run(Optional<InteractiveShell> shell,
-                    PrintWriter writer,
-                    MetadataNodeManager manager) throws Exception {
-        log.trace("cat " + targets);
+    public void run(
+        Optional<InteractiveShell> shell,
+        PrintWriter writer,
+        MetadataShellState state
+    ) throws Exception {
+        log.trace("tree " + targets);
         for (String target : targets) {
-            manager.visit(new GlobVisitor(target, entryOption -> {
+            state.visit(new GlobVisitor(target, entryOption -> {
                 if (entryOption.isPresent()) {
                     MetadataNode node = entryOption.get().node();
-                    if (node instanceof DirectoryNode) {
-                        writer.println("cat: " + target + ": Is a directory");
-                    } else if (node instanceof FileNode) {
-                        FileNode fileNode = (FileNode) node;
-                        writer.println(fileNode.contents());
-                    }
+                    ShellNodePrinter printer = new ShellNodePrinter(writer);
+                    node.print(printer);
                 } else {
-                    writer.println("cat: " + target + ": No such file or directory.");
+                    writer.println("tree: " + target + ": No such file or directory.");
                 }
             }));
         }
@@ -112,8 +115,8 @@ public final class CatCommandHandler implements Commands.Handler {
 
     @Override
     public boolean equals(Object other) {
-        if (!(other instanceof CatCommandHandler)) return false;
-        CatCommandHandler o = (CatCommandHandler) other;
+        if (!(other instanceof TreeCommandHandler)) return false;
+        TreeCommandHandler o = (TreeCommandHandler) other;
         if (!Objects.equals(o.targets, targets)) return false;
         return true;
     }
diff --git a/shell/src/main/java/org/apache/kafka/shell/GlobComponent.java b/shell/src/main/java/org/apache/kafka/shell/glob/GlobComponent.java
similarity index 99%
rename from shell/src/main/java/org/apache/kafka/shell/GlobComponent.java
rename to shell/src/main/java/org/apache/kafka/shell/glob/GlobComponent.java
index b93382b258e..4de7616b8c3 100644
--- a/shell/src/main/java/org/apache/kafka/shell/GlobComponent.java
+++ b/shell/src/main/java/org/apache/kafka/shell/glob/GlobComponent.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.shell.glob;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/shell/src/main/java/org/apache/kafka/shell/GlobVisitor.java b/shell/src/main/java/org/apache/kafka/shell/glob/GlobVisitor.java
similarity index 74%
rename from shell/src/main/java/org/apache/kafka/shell/GlobVisitor.java
rename to shell/src/main/java/org/apache/kafka/shell/glob/GlobVisitor.java
index 8081b7e4450..1bdd48c4fc2 100644
--- a/shell/src/main/java/org/apache/kafka/shell/GlobVisitor.java
+++ b/shell/src/main/java/org/apache/kafka/shell/glob/GlobVisitor.java
@@ -15,11 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.shell.glob;
 
+import org.apache.kafka.image.node.MetadataNode;
+import org.apache.kafka.shell.command.CommandUtils;
+import org.apache.kafka.shell.state.MetadataShellState;
+
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.function.Consumer;
@@ -27,7 +31,7 @@ import java.util.function.Consumer;
 /**
  * Visits metadata paths based on a glob string.
  */
-public final class GlobVisitor implements Consumer<MetadataNodeManager.Data> {
+public final class GlobVisitor implements Consumer<MetadataShellState> {
     private final String glob;
     private final Consumer<Optional<MetadataNodeInfo>> handler;
 
@@ -93,20 +97,26 @@ public final class GlobVisitor implements Consumer<MetadataNodeManager.Data> {
     }
 
     @Override
-    public void accept(MetadataNodeManager.Data data) {
+    public void accept(MetadataShellState state) {
         String fullGlob = glob.startsWith("/") ? glob :
-            data.workingDirectory() + "/" + glob;
+            state.workingDirectory() + "/" + glob;
         List<String> globComponents =
             CommandUtils.stripDotPathComponents(CommandUtils.splitPath(fullGlob));
-        if (!accept(globComponents, 0, data.root(), new String[0])) {
+        MetadataNode root = state.root();
+        if (root == null) {
+            throw new RuntimeException("Invalid null root");
+        }
+        if (!accept(globComponents, 0, root, new String[0])) {
             handler.accept(Optional.empty());
         }
     }
 
-    private boolean accept(List<String> globComponents,
-                           int componentIndex,
-                           MetadataNode node,
-                           String[] path) {
+    private boolean accept(
+        List<String> globComponents,
+        int componentIndex,
+        MetadataNode node,
+        String[] path
+    ) {
         if (componentIndex >= globComponents.size()) {
             handler.accept(Optional.of(new MetadataNodeInfo(path, node)));
             return true;
@@ -114,11 +124,10 @@ public final class GlobVisitor implements Consumer<MetadataNodeManager.Data> {
         String globComponentString = globComponents.get(componentIndex);
         GlobComponent globComponent = new GlobComponent(globComponentString);
         if (globComponent.literal()) {
-            if (!(node instanceof MetadataNode.DirectoryNode)) {
+            if (!node.isDirectory()) {
                 return false;
             }
-            MetadataNode.DirectoryNode directory = (MetadataNode.DirectoryNode) node;
-            MetadataNode child = directory.child(globComponent.component());
+            MetadataNode child = node.child(globComponent.component());
             if (child == null) {
                 return false;
             }
@@ -127,18 +136,23 @@ public final class GlobVisitor implements Consumer<MetadataNodeManager.Data> {
             newPath[path.length] = globComponent.component();
             return accept(globComponents, componentIndex + 1, child, newPath);
         }
-        if (!(node instanceof MetadataNode.DirectoryNode)) {
+        if (!node.isDirectory()) {
             return false;
         }
-        MetadataNode.DirectoryNode directory = (MetadataNode.DirectoryNode) node;
         boolean matchedAny = false;
-        for (Entry<String, MetadataNode> entry : directory.children().entrySet()) {
-            String nodeName = entry.getKey();
+        ArrayList<String> nodeChildNames = new ArrayList<>(node.childNames());
+        nodeChildNames.sort(String::compareTo);
+        for (String nodeName : nodeChildNames) {
             if (globComponent.matches(nodeName)) {
                 String[] newPath = new String[path.length + 1];
                 System.arraycopy(path, 0, newPath, 0, path.length);
                 newPath[path.length] = nodeName;
-                if (accept(globComponents, componentIndex + 1, entry.getValue(), newPath)) {
+                MetadataNode child = node.child(nodeName);
+                if (child == null) {
+                    throw new RuntimeException("Expected " + nodeName + " to be a valid child of " +
+                            node.getClass() + ", but it was not.");
+                }
+                if (accept(globComponents, componentIndex + 1, child, newPath)) {
                     matchedAny = true;
                 }
             }
diff --git a/shell/src/main/java/org/apache/kafka/shell/node/LocalShellNode.java b/shell/src/main/java/org/apache/kafka/shell/node/LocalShellNode.java
new file mode 100644
index 00000000000..7cb047d726e
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/node/LocalShellNode.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell.node;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.image.node.MetadataLeafNode;
+import org.apache.kafka.image.node.MetadataNode;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+
+/**
+ * The /local node of the metadata shell, which contains information about the shell itself.
+ */
+public class LocalShellNode implements MetadataNode {
+    /**
+     * The name of this node.
+     */
+    public static final String NAME = "local";
+
+    /**
+     * Contains the shell software version.
+     */
+    public static final String VERSION = "version";
+
+    /**
+     * Contains the shell software commit id.
+     */
+    public static final String COMMIT_ID = "commitId";
+
+    @Override
+    public Collection<String> childNames() {
+        return Arrays.asList(VERSION, COMMIT_ID);
+    }
+
+    @Override
+    public MetadataNode child(String name) {
+        if (name.equals(VERSION)) {
+            return new MetadataLeafNode(AppInfoParser.getVersion());
+        } else if (name.equals(COMMIT_ID)) {
+            return new MetadataLeafNode(AppInfoParser.getCommitId());
+        } else {
+            return null;
+        }
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/node/RootShellNode.java b/shell/src/main/java/org/apache/kafka/shell/node/RootShellNode.java
new file mode 100644
index 00000000000..c5305c0fda4
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/node/RootShellNode.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell.node;
+
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.node.MetadataImageNode;
+import org.apache.kafka.image.node.MetadataNode;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * The root node of the Kafka metadata shell.
+ */
+public class RootShellNode implements MetadataNode {
+    private final MetadataImage image;
+
+    public RootShellNode(MetadataImage image) {
+        this.image = image;
+    }
+
+    @Override
+    public Collection<String> childNames() {
+        return Arrays.asList(LocalShellNode.NAME, MetadataImageNode.NAME);
+    }
+
+    @Override
+    public MetadataNode child(String name) {
+        if (name.equals(LocalShellNode.NAME)) {
+            return new LocalShellNode();
+        } else if (name.equals(MetadataImageNode.NAME)) {
+            return new MetadataImageNode(image);
+        } else {
+            return null;
+        }
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/node/printer/ShellNodePrinter.java b/shell/src/main/java/org/apache/kafka/shell/node/printer/ShellNodePrinter.java
new file mode 100644
index 00000000000..de412781442
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/node/printer/ShellNodePrinter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell.node.printer;
+
+import org.apache.kafka.image.node.printer.MetadataNodePrinter;
+import org.apache.kafka.image.node.printer.MetadataNodeRedactionCriteria;
+
+import java.io.PrintWriter;
+
+
+/**
+ * Prints Kafka metadata shell nodes.
+ */
+public class ShellNodePrinter implements MetadataNodePrinter {
+    private final PrintWriter writer;
+    private int indentationLevel;
+
+    public ShellNodePrinter(PrintWriter writer) {
+        this.writer = writer;
+    }
+
+    String indentationString() {
+        StringBuilder bld = new StringBuilder();
+        for (int i = 0; i < indentationLevel; i++) {
+            for (int j = 0; j < 2; j++) {
+                bld.append(" ");
+            }
+        }
+        return bld.toString();
+    }
+
+    @Override
+    public MetadataNodeRedactionCriteria redactionCriteria() {
+        return MetadataNodeRedactionCriteria.Disabled.INSTANCE;
+    }
+
+    @Override
+    public void enterNode(String name) {
+        writer.append(String.format("%s%s:%n", indentationString(), name));
+        indentationLevel++;
+    }
+
+    @Override
+    public void leaveNode() {
+        indentationLevel--;
+    }
+
+    @Override
+    public void output(String text) {
+        writer.append(String.format("%s%s%n", indentationString(), text));
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/state/MetadataShellPublisher.java b/shell/src/main/java/org/apache/kafka/shell/state/MetadataShellPublisher.java
new file mode 100644
index 00000000000..c0c6d296b4b
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/state/MetadataShellPublisher.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell.state;
+
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.shell.node.RootShellNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Publishes changes to the Kafka Metadata Shell.
+ */
+public class MetadataShellPublisher implements MetadataPublisher {
+    private static final Logger log = LoggerFactory.getLogger(MetadataShellPublisher.class);
+
+    private final MetadataShellState state;
+
+    public MetadataShellPublisher(MetadataShellState state) {
+        this.state = state;
+    }
+
+    @Override
+    public String name() {
+        return "MetadataShellPublisher";
+    }
+
+    @Override
+    public void onMetadataUpdate(
+        MetadataDelta delta,
+        MetadataImage newImage,
+        LoaderManifest manifest
+    ) {
+        log.trace("onMetadataUpdate newImage={}", newImage);
+        state.setRoot(new RootShellNode(newImage));
+    }
+}
diff --git a/shell/src/main/java/org/apache/kafka/shell/state/MetadataShellState.java b/shell/src/main/java/org/apache/kafka/shell/state/MetadataShellState.java
new file mode 100644
index 00000000000..cd7e4b958b6
--- /dev/null
+++ b/shell/src/main/java/org/apache/kafka/shell/state/MetadataShellState.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.shell.state;
+
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.node.MetadataNode;
+import org.apache.kafka.shell.node.RootShellNode;
+
+import java.util.function.Consumer;
+
+/**
+ * The mutable state of the Kafka metadata shell.
+ */
+public class MetadataShellState {
+    private volatile MetadataNode root;
+    private volatile String workingDirectory;
+
+    public MetadataShellState() {
+        this.root = new RootShellNode(MetadataImage.EMPTY);
+        this.workingDirectory = "/";
+    }
+
+    public MetadataNode root() {
+        return root;
+    }
+
+    public void setRoot(MetadataNode root) {
+        this.root = root;
+    }
+
+    public String workingDirectory() {
+        return workingDirectory;
+    }
+
+    public void setWorkingDirectory(String workingDirectory) {
+        this.workingDirectory = workingDirectory;
+    }
+
+    public void visit(Consumer<MetadataShellState> consumer) {
+        consumer.accept(this);
+    }
+}
diff --git a/shell/src/test/java/org/apache/kafka/shell/GlobComponentTest.java b/shell/src/test/java/org/apache/kafka/shell/GlobComponentTest.java
deleted file mode 100644
index 8d3e5e3ce7e..00000000000
--- a/shell/src/test/java/org/apache/kafka/shell/GlobComponentTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.shell;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-
-@Timeout(value = 120000, unit = MILLISECONDS)
-public class GlobComponentTest {
-    private void verifyIsLiteral(GlobComponent globComponent, String component) {
-        assertTrue(globComponent.literal());
-        assertEquals(component, globComponent.component());
-        assertTrue(globComponent.matches(component));
-        assertFalse(globComponent.matches(component + "foo"));
-    }
-
-    @Test
-    public void testLiteralComponent() {
-        verifyIsLiteral(new GlobComponent("abc"), "abc");
-        verifyIsLiteral(new GlobComponent(""), "");
-        verifyIsLiteral(new GlobComponent("foobar_123"), "foobar_123");
-        verifyIsLiteral(new GlobComponent("$blah+"), "$blah+");
-    }
-
-    @Test
-    public void testToRegularExpression() {
-        assertNull(GlobComponent.toRegularExpression("blah"));
-        assertNull(GlobComponent.toRegularExpression(""));
-        assertNull(GlobComponent.toRegularExpression("does not need a regex, actually"));
-        assertEquals("^\\$blah.*$", GlobComponent.toRegularExpression("$blah*"));
-        assertEquals("^.*$", GlobComponent.toRegularExpression("*"));
-        assertEquals("^foo(?:(?:bar)|(?:baz))$", GlobComponent.toRegularExpression("foo{bar,baz}"));
-    }
-
-    @Test
-    public void testGlobMatch() {
-        GlobComponent star = new GlobComponent("*");
-        assertFalse(star.literal());
-        assertTrue(star.matches(""));
-        assertTrue(star.matches("anything"));
-        GlobComponent question = new GlobComponent("b?b");
-        assertFalse(question.literal());
-        assertFalse(question.matches(""));
-        assertTrue(question.matches("bob"));
-        assertTrue(question.matches("bib"));
-        assertFalse(question.matches("bic"));
-        GlobComponent foobarOrFoobaz = new GlobComponent("foo{bar,baz}");
-        assertFalse(foobarOrFoobaz.literal());
-        assertTrue(foobarOrFoobaz.matches("foobar"));
-        assertTrue(foobarOrFoobaz.matches("foobaz"));
-        assertFalse(foobarOrFoobaz.matches("foobah"));
-        assertFalse(foobarOrFoobaz.matches("foo"));
-        assertFalse(foobarOrFoobaz.matches("baz"));
-    }
-}
diff --git a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java
deleted file mode 100644
index 88b3c48b6c7..00000000000
--- a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java
+++ /dev/null
@@ -1,451 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.shell;
-
-import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.acl.AclOperation;
-import org.apache.kafka.common.acl.AclPermissionType;
-import org.apache.kafka.common.config.ConfigResource;
-import org.apache.kafka.common.metadata.AccessControlEntryRecord;
-import org.apache.kafka.common.metadata.AccessControlEntryRecordJsonConverter;
-import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
-import org.apache.kafka.common.metadata.ClientQuotaRecord;
-import org.apache.kafka.common.metadata.ConfigRecord;
-import org.apache.kafka.common.metadata.FeatureLevelRecord;
-import org.apache.kafka.common.metadata.FeatureLevelRecordJsonConverter;
-import org.apache.kafka.common.metadata.FenceBrokerRecord;
-import org.apache.kafka.common.metadata.PartitionChangeRecord;
-import org.apache.kafka.common.metadata.PartitionRecord;
-import org.apache.kafka.common.metadata.PartitionRecordJsonConverter;
-import org.apache.kafka.common.metadata.ProducerIdsRecord;
-import org.apache.kafka.common.metadata.RegisterBrokerRecord;
-import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
-import org.apache.kafka.common.metadata.RemoveTopicRecord;
-import org.apache.kafka.common.metadata.TopicRecord;
-import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
-import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
-import org.apache.kafka.common.resource.PatternType;
-import org.apache.kafka.common.resource.ResourceType;
-import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
-import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
-import org.apache.kafka.metadata.LeaderRecoveryState;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-
-import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-
-
-public class MetadataNodeManagerTest {
-
-    private MetadataNodeManager metadataNodeManager;
-
-    @BeforeEach
-    public void setup() throws Exception {
-        metadataNodeManager = new MetadataNodeManager();
-        metadataNodeManager.setup();
-    }
-
-    @AfterEach
-    public void cleanup() throws Exception {
-        metadataNodeManager.close();
-    }
-
-    @Test
-    public void testRegisterBrokerRecordAndUnregisterBrokerRecord() {
-        // Register broker
-        RegisterBrokerRecord record = new RegisterBrokerRecord()
-            .setBrokerId(1)
-            .setBrokerEpoch(2);
-        metadataNodeManager.handleMessage(record);
-
-        assertEquals(record.toString(),
-            metadataNodeManager.getData().root().directory("brokers", "1").file("registration").contents());
-        assertEquals("true",
-            metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
-
-        // Unregister broker
-        UnregisterBrokerRecord unregisterBrokerRecord = new UnregisterBrokerRecord()
-            .setBrokerId(1);
-        metadataNodeManager.handleMessage(unregisterBrokerRecord);
-        assertFalse(metadataNodeManager.getData().root().directory("brokers").children().containsKey("1"));
-    }
-
-    @Test
-    public void testTopicRecordAndRemoveTopicRecord() {
-        // Add topic
-        TopicRecord topicRecord = new TopicRecord()
-            .setName("topicName")
-            .setTopicId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"));
-
-        metadataNodeManager.handleMessage(topicRecord);
-
-        assertEquals("topicName",
-            metadataNodeManager.getData().root().directory("topics", "topicName").file("name").contents());
-        assertEquals("GcaQDl2UTsCNs1p9s37XkQ",
-            metadataNodeManager.getData().root().directory("topics", "topicName").file("id").contents());
-        assertEquals("topicName",
-            metadataNodeManager.getData().root().directory("topicIds", "GcaQDl2UTsCNs1p9s37XkQ").file("name").contents());
-        assertEquals("GcaQDl2UTsCNs1p9s37XkQ",
-            metadataNodeManager.getData().root().directory("topicIds", "GcaQDl2UTsCNs1p9s37XkQ").file("id").contents());
-
-        // Remove topic
-        RemoveTopicRecord removeTopicRecord = new RemoveTopicRecord()
-            .setTopicId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"));
-
-        metadataNodeManager.handleMessage(removeTopicRecord);
-
-        assertFalse(
-            metadataNodeManager.getData().root().directory("topicIds").children().containsKey("GcaQDl2UTsCNs1p9s37XkQ"));
-        assertFalse(
-            metadataNodeManager.getData().root().directory("topics").children().containsKey("topicName"));
-    }
-
-    @Test
-    public void testPartitionRecord() {
-        PartitionRecord record = new PartitionRecord()
-            .setTopicId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"))
-            .setPartitionId(0)
-            .setLeaderEpoch(1)
-            .setReplicas(Arrays.asList(1, 2, 3))
-            .setIsr(Arrays.asList(1, 2, 3));
-
-        metadataNodeManager.handleMessage(record);
-        assertEquals(
-            PartitionRecordJsonConverter.write(record, PartitionRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString(),
-            metadataNodeManager.getData().root().directory("topicIds", "GcaQDl2UTsCNs1p9s37XkQ", "0").file("data").contents());
-    }
-
-    @Test
-    public void testValidConfigRecord() {
-        checkValidConfigRecord(ConfigResource.Type.BROKER.id(), "broker", "0", "0");
-        checkValidConfigRecord(ConfigResource.Type.TOPIC.id(), "topic", "0", "0");
-    }
-
-    @Test
-    public void testDefaultBrokerRecord() {
-        checkValidConfigRecord(ConfigResource.Type.BROKER.id(), "broker", "", "<default>");
-        // Default topic resources are not allowed, so we don't test it.
-    }
-
-    private void checkValidConfigRecord(byte resourceType, String typeString, String resourceName, String resourceNameKey) {
-        ConfigRecord configRecord = new ConfigRecord()
-            .setResourceType(resourceType)
-            .setResourceName(resourceName)
-            .setName("name")
-            .setValue("kraft");
-
-        metadataNodeManager.handleMessage(configRecord);
-        assertEquals("kraft",
-            metadataNodeManager.getData().root().directory("configs", typeString, resourceNameKey).file("name").contents());
-
-        // null value indicates delete
-        configRecord.setValue(null);
-        metadataNodeManager.handleMessage(configRecord);
-        assertFalse(
-            metadataNodeManager.getData().root().directory("configs", typeString, resourceNameKey).children().containsKey("name"));
-    }
-
-    @Test
-    public void testInvalidConfigRecord() {
-        checkInvalidConfigRecord(ConfigResource.Type.BROKER_LOGGER.id());
-        checkInvalidConfigRecord(ConfigResource.Type.UNKNOWN.id());
-    }
-
-    private void checkInvalidConfigRecord(byte resourceType) {
-        ConfigRecord configRecord = new ConfigRecord()
-            .setResourceType(resourceType)
-            .setResourceName("0")
-            .setName("name")
-            .setValue("kraft");
-        metadataNodeManager.handleMessage(configRecord);
-        assertFalse(metadataNodeManager.getData().root().children().containsKey("configs"));
-    }
-
-    @Test
-    public void testPartitionChangeRecord() {
-        PartitionRecord oldPartitionRecord = new PartitionRecord()
-            .setTopicId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"))
-            .setPartitionId(0)
-            .setPartitionEpoch(0)
-            .setLeader(0)
-            .setLeaderEpoch(0)
-            .setIsr(Arrays.asList(0, 1, 2))
-            .setReplicas(Arrays.asList(0, 1, 2));
-
-        PartitionChangeRecord partitionChangeRecord = new PartitionChangeRecord()
-            .setTopicId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"))
-            .setPartitionId(0)
-            .setLeader(NO_LEADER_CHANGE)
-            .setReplicas(Arrays.asList(0, 1, 2));
-
-        PartitionRecord newPartitionRecord = new PartitionRecord()
-            .setTopicId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"))
-            .setPartitionId(0)
-            .setPartitionEpoch(1)
-            .setLeader(0)
-            .setLeaderEpoch(0)
-            .setIsr(Arrays.asList(0, 1, 2))
-            .setReplicas(Arrays.asList(0, 1, 2));
-
-        // Change nothing
-        checkPartitionChangeRecord(
-            oldPartitionRecord,
-            partitionChangeRecord,
-            newPartitionRecord
-        );
-
-        // Change isr
-        checkPartitionChangeRecord(
-            oldPartitionRecord,
-            partitionChangeRecord.duplicate().setIsr(Arrays.asList(0, 2)),
-            newPartitionRecord.duplicate().setIsr(Arrays.asList(0, 2))
-        );
-
-        // Change leader
-        checkPartitionChangeRecord(
-            oldPartitionRecord,
-            partitionChangeRecord.duplicate().setLeader(1),
-            newPartitionRecord.duplicate().setLeader(1).setLeaderEpoch(1)
-        );
-
-        // Change leader recovery state
-        checkPartitionChangeRecord(
-            oldPartitionRecord,
-            partitionChangeRecord.duplicate().setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value()),
-            newPartitionRecord.duplicate().setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value()));
-    }
-
-    private void checkPartitionChangeRecord(PartitionRecord oldPartitionRecord,
-                                           PartitionChangeRecord partitionChangeRecord,
-                                           PartitionRecord newPartitionRecord) {
-        metadataNodeManager.handleMessage(oldPartitionRecord);
-        metadataNodeManager.handleMessage(partitionChangeRecord);
-        assertEquals(
-            PartitionRecordJsonConverter.write(newPartitionRecord, PartitionRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString(),
-            metadataNodeManager.getData().root()
-                .directory("topicIds", oldPartitionRecord.topicId().toString(), oldPartitionRecord.partitionId() + "")
-                .file("data").contents()
-        );
-    }
-
-    @Test
-    public void testUnfenceBrokerRecordAndFenceBrokerRecord() {
-        RegisterBrokerRecord record = new RegisterBrokerRecord()
-            .setBrokerId(1)
-            .setBrokerEpoch(2);
-        metadataNodeManager.handleMessage(record);
-
-        assertEquals("true",
-            metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
-
-        UnfenceBrokerRecord unfenceBrokerRecord = new UnfenceBrokerRecord()
-            .setId(1)
-            .setEpoch(2);
-        metadataNodeManager.handleMessage(unfenceBrokerRecord);
-        assertEquals("false",
-            metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
-
-        FenceBrokerRecord fenceBrokerRecord = new FenceBrokerRecord()
-            .setId(1)
-            .setEpoch(2);
-        metadataNodeManager.handleMessage(fenceBrokerRecord);
-        assertEquals("true",
-            metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
-    }
-
-    @Test
-    public void testBrokerRegistrationChangeRecord() {
-        RegisterBrokerRecord record = new RegisterBrokerRecord()
-            .setBrokerId(1)
-            .setBrokerEpoch(2);
-        metadataNodeManager.handleMessage(record);
-        assertEquals("true",
-            metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
-
-        // Unfence broker
-        BrokerRegistrationChangeRecord record1 = new BrokerRegistrationChangeRecord()
-            .setBrokerId(1)
-            .setBrokerEpoch(2)
-            .setFenced(BrokerRegistrationFencingChange.UNFENCE.value());
-        metadataNodeManager.handleMessage(record1);
-        assertEquals("false",
-            metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
-
-        // Fence broker
-        BrokerRegistrationChangeRecord record2 = new BrokerRegistrationChangeRecord()
-            .setBrokerId(1)
-            .setBrokerEpoch(2)
-            .setFenced(BrokerRegistrationFencingChange.FENCE.value());
-        metadataNodeManager.handleMessage(record2);
-        assertEquals("true",
-            metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
-
-        // Unchanged
-        BrokerRegistrationChangeRecord record3 = new BrokerRegistrationChangeRecord()
-            .setBrokerId(1)
-            .setBrokerEpoch(2)
-            .setFenced(BrokerRegistrationFencingChange.NONE.value());
-        metadataNodeManager.handleMessage(record3);
-        assertEquals("true",
-            metadataNodeManager.getData().root().directory("brokers", "1").file("isFenced").contents());
-
-        // Controlled shutdown
-        BrokerRegistrationChangeRecord record4 = new BrokerRegistrationChangeRecord()
-            .setBrokerId(1)
-            .setBrokerEpoch(2)
-            .setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value());
-        metadataNodeManager.handleMessage(record4);
-        assertEquals("true",
-            metadataNodeManager.getData().root().directory("brokers", "1").file("inControlledShutdown").contents());
-
-        // Unchanged
-        BrokerRegistrationChangeRecord record5 = new BrokerRegistrationChangeRecord()
-            .setBrokerId(1)
-            .setBrokerEpoch(2)
-            .setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.NONE.value());
-        metadataNodeManager.handleMessage(record5);
-        assertEquals("true",
-            metadataNodeManager.getData().root().directory("brokers", "1").file("inControlledShutdown").contents());
-    }
-
-    @Test
-    public void testClientQuotaRecord() {
-        ClientQuotaRecord record = new ClientQuotaRecord()
-            .setEntity(Arrays.asList(
-                    new ClientQuotaRecord.EntityData()
-                        .setEntityType("user")
-                        .setEntityName("kraft"),
-                    new ClientQuotaRecord.EntityData()
-                        .setEntityType("client")
-                        .setEntityName("kstream")
-                ))
-            .setKey("producer_byte_rate")
-            .setValue(1000.0);
-
-        metadataNodeManager.handleMessage(record);
-
-        assertEquals("1000.0",
-            metadataNodeManager.getData().root().directory("client-quotas",
-                "client", "kstream",
-                "user", "kraft").file("producer_byte_rate").contents());
-
-        metadataNodeManager.handleMessage(record.setRemove(true));
-
-        assertFalse(
-            metadataNodeManager.getData().root().directory("client-quotas",
-                "client", "kstream",
-                "user", "kraft").children().containsKey("producer_byte_rate"));
-
-        record = new ClientQuotaRecord()
-            .setEntity(Collections.singletonList(
-                new ClientQuotaRecord.EntityData()
-                    .setEntityType("user")
-                    .setEntityName(null)
-            ))
-            .setKey("producer_byte_rate")
-            .setValue(2000.0);
-
-        metadataNodeManager.handleMessage(record);
-
-        assertEquals("2000.0",
-            metadataNodeManager.getData().root().directory("client-quotas",
-                "user", "<default>").file("producer_byte_rate").contents());
-    }
-
-    @Test
-    public void testProducerIdsRecord() {
-        // generate a producerId record
-        ProducerIdsRecord record1 = new ProducerIdsRecord()
-            .setBrokerId(0)
-            .setBrokerEpoch(1)
-            .setNextProducerId(10000);
-        metadataNodeManager.handleMessage(record1);
-
-        assertEquals(
-            "0",
-            metadataNodeManager.getData().root().directory("producerIds").file("lastBlockBrokerId").contents());
-        assertEquals(
-            "1",
-            metadataNodeManager.getData().root().directory("producerIds").file("lastBlockBrokerEpoch").contents());
-        assertEquals(
-            10000 + "",
-            metadataNodeManager.getData().root().directory("producerIds").file("nextBlockStartId").contents());
-
-        // generate another producerId record
-        ProducerIdsRecord record2 = new ProducerIdsRecord()
-            .setBrokerId(1)
-            .setBrokerEpoch(2)
-            .setNextProducerId(11000);
-        metadataNodeManager.handleMessage(record2);
-
-        assertEquals(
-            "1",
-            metadataNodeManager.getData().root().directory("producerIds").file("lastBlockBrokerId").contents());
-        assertEquals(
-            "2",
-            metadataNodeManager.getData().root().directory("producerIds").file("lastBlockBrokerEpoch").contents());
-        assertEquals(
-            11000 + "",
-            metadataNodeManager.getData().root().directory("producerIds").file("nextBlockStartId").contents());
-    }
-
-    @Test
-    public void testAccessControlEntryRecordAndRemoveAccessControlEntryRecord() {
-        AccessControlEntryRecord record1 = new AccessControlEntryRecord()
-            .setId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"))
-            .setHost("example.com")
-            .setResourceType(ResourceType.GROUP.code())
-            .setResourceName("group")
-            .setOperation(AclOperation.READ.code())
-            .setPermissionType(AclPermissionType.ALLOW.code())
-            .setPrincipal("User:kafka")
-            .setPatternType(PatternType.LITERAL.code());
-        metadataNodeManager.handleMessage(record1);
-        assertEquals(
-            AccessControlEntryRecordJsonConverter.write(record1, AccessControlEntryRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString(),
-            metadataNodeManager.getData().root().directory("acl").directory("id").file("GcaQDl2UTsCNs1p9s37XkQ").contents());
-
-        RemoveAccessControlEntryRecord record2 = new RemoveAccessControlEntryRecord()
-            .setId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"));
-        metadataNodeManager.handleMessage(record2);
-        assertFalse(metadataNodeManager.getData().root().directory("acl").directory("id").children().containsKey("GcaQDl2UTsCNs1p9s37XkQ"));
-    }
-
-    @Test
-    public void testFeatureLevelRecord() {
-        FeatureLevelRecord record1 = new FeatureLevelRecord()
-            .setName("metadata.version")
-            .setFeatureLevel((short) 3);
-        metadataNodeManager.handleMessage(record1);
-        assertEquals(
-            FeatureLevelRecordJsonConverter.write(record1, FeatureLevelRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString(),
-            metadataNodeManager.getData().root().directory("features").file("metadata.version").contents());
-
-        FeatureLevelRecord record2 = new FeatureLevelRecord()
-            .setName("metadata.version")
-            .setFeatureLevel((short) 0);
-        metadataNodeManager.handleMessage(record2);
-        assertFalse(metadataNodeManager.getData().root().directory("features").children().containsKey("metadata.version"));
-    }
-}
diff --git a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeTest.java b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeTest.java
deleted file mode 100644
index 72c0f7d21d3..00000000000
--- a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.shell;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-import org.apache.kafka.shell.MetadataNode.DirectoryNode;
-import org.apache.kafka.shell.MetadataNode.FileNode;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-
-@Timeout(value = 120000, unit = MILLISECONDS)
-public class MetadataNodeTest {
-    @Test
-    public void testMkdirs() {
-        DirectoryNode root = new DirectoryNode();
-        DirectoryNode defNode = root.mkdirs("abc", "def");
-        DirectoryNode defNode2 = root.mkdirs("abc", "def");
-        assertSame(defNode, defNode2);
-        DirectoryNode defNode3 = root.directory("abc", "def");
-        assertSame(defNode, defNode3);
-        root.mkdirs("ghi");
-        assertEquals(new HashSet<>(Arrays.asList("abc", "ghi")), root.children().keySet());
-        assertEquals(Collections.singleton("def"), root.mkdirs("abc").children().keySet());
-        assertEquals(Collections.emptySet(), defNode.children().keySet());
-    }
-
-    @Test
-    public void testRmrf() {
-        DirectoryNode root = new DirectoryNode();
-        DirectoryNode foo = root.mkdirs("foo");
-        foo.mkdirs("a");
-        foo.mkdirs("b");
-        root.mkdirs("baz");
-        assertEquals(new HashSet<>(Arrays.asList("foo", "baz")), root.children().keySet());
-        root.rmrf("foo", "a");
-        assertEquals(new HashSet<>(Arrays.asList("b")), foo.children().keySet());
-        root.rmrf("foo");
-        assertEquals(new HashSet<>(Collections.singleton("baz")), root.children().keySet());
-    }
-
-    @Test
-    public void testCreateFiles() {
-        DirectoryNode root = new DirectoryNode();
-        DirectoryNode abcdNode = root.mkdirs("abcd");
-        FileNode quuxNodde = abcdNode.create("quux");
-        quuxNodde.setContents("quux contents");
-        assertEquals("quux contents", quuxNodde.contents());
-        assertThrows(NotDirectoryException.class, () -> root.mkdirs("abcd", "quux"));
-    }
-}
diff --git a/shell/src/test/java/org/apache/kafka/shell/CommandTest.java b/shell/src/test/java/org/apache/kafka/shell/command/CommandTest.java
similarity index 97%
rename from shell/src/test/java/org/apache/kafka/shell/CommandTest.java
rename to shell/src/test/java/org/apache/kafka/shell/command/CommandTest.java
index c896a06caa0..0b88d133032 100644
--- a/shell/src/test/java/org/apache/kafka/shell/CommandTest.java
+++ b/shell/src/test/java/org/apache/kafka/shell/command/CommandTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.shell.command;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -56,7 +56,7 @@ public class CommandTest {
     @Test
     public void testParseInvalidCommand() {
         assertEquals(new ErroneousCommandHandler("invalid choice: 'blah' (choose " +
-            "from 'cat', 'cd', 'exit', 'find', 'help', 'history', 'ls', 'man', 'pwd')"),
+            "from 'cat', 'cd', 'exit', 'find', 'help', 'history', 'ls', 'man', 'pwd', 'tree')"),
             new Commands(true).parseCommand(Arrays.asList("blah")));
     }
 
diff --git a/shell/src/test/java/org/apache/kafka/shell/CommandUtilsTest.java b/shell/src/test/java/org/apache/kafka/shell/command/CommandUtilsTest.java
similarity index 90%
rename from shell/src/test/java/org/apache/kafka/shell/CommandUtilsTest.java
rename to shell/src/test/java/org/apache/kafka/shell/command/CommandUtilsTest.java
index 90c3b5c1489..7ee36a9e91a 100644
--- a/shell/src/test/java/org/apache/kafka/shell/CommandUtilsTest.java
+++ b/shell/src/test/java/org/apache/kafka/shell/command/CommandUtilsTest.java
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.shell.command;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import org.junit.jupiter.api.Test;
@@ -25,7 +25,7 @@ import org.junit.jupiter.api.Timeout;
 
 import java.util.Arrays;
 
-@Timeout(value = 120000, unit = MILLISECONDS)
+@Timeout(value = 5, unit = MINUTES)
 public class CommandUtilsTest {
     @Test
     public void testSplitPath() {
diff --git a/shell/src/test/java/org/apache/kafka/shell/LsCommandHandlerTest.java b/shell/src/test/java/org/apache/kafka/shell/command/LsCommandHandlerTest.java
similarity index 95%
rename from shell/src/test/java/org/apache/kafka/shell/LsCommandHandlerTest.java
rename to shell/src/test/java/org/apache/kafka/shell/command/LsCommandHandlerTest.java
index c845706f1b6..b2a8681b1a9 100644
--- a/shell/src/test/java/org/apache/kafka/shell/LsCommandHandlerTest.java
+++ b/shell/src/test/java/org/apache/kafka/shell/command/LsCommandHandlerTest.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.shell.command;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-import org.apache.kafka.shell.LsCommandHandler.ColumnSchema;
-import org.apache.kafka.shell.LsCommandHandler.TargetDirectory;
+import org.apache.kafka.shell.command.LsCommandHandler.ColumnSchema;
+import org.apache.kafka.shell.command.LsCommandHandler.TargetDirectory;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
diff --git a/shell/src/test/java/org/apache/kafka/shell/GlobVisitorTest.java b/shell/src/test/java/org/apache/kafka/shell/glob/GlobVisitorTest.java
similarity index 65%
rename from shell/src/test/java/org/apache/kafka/shell/GlobVisitorTest.java
rename to shell/src/test/java/org/apache/kafka/shell/glob/GlobVisitorTest.java
index 59eeb5db79e..000d8f2f017 100644
--- a/shell/src/test/java/org/apache/kafka/shell/GlobVisitorTest.java
+++ b/shell/src/test/java/org/apache/kafka/shell/glob/GlobVisitorTest.java
@@ -15,34 +15,86 @@
  * limitations under the License.
  */
 
-package org.apache.kafka.shell;
+package org.apache.kafka.shell.glob;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-import org.apache.kafka.shell.GlobVisitor.MetadataNodeInfo;
+import org.apache.kafka.image.node.MetadataNode;
+import org.apache.kafka.shell.glob.GlobVisitor.MetadataNodeInfo;
+import org.apache.kafka.shell.state.MetadataShellState;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.function.Consumer;
 
-@Timeout(value = 120000, unit = MILLISECONDS)
+@Timeout(value = 5, unit = MINUTES)
 public class GlobVisitorTest {
-    static private final MetadataNodeManager.Data DATA;
+    static private final MetadataShellState DATA;
+
+    static class TestNode implements MetadataNode {
+        private final String name;
+
+        private final Map<String, TestNode> children;
+
+        private final boolean isDirectory;
+
+        TestNode(String name, boolean isDirectory) {
+            this.name = name;
+            this.children = new HashMap<>();
+            this.isDirectory = isDirectory;
+        }
+
+        TestNode(String name, TestNode... children) {
+            this.name = name;
+            this.children = new HashMap<>();
+            for (TestNode child : children) {
+                this.children.put(child.name, child);
+            }
+            this.isDirectory = true;
+        }
+
+        @Override
+        public boolean isDirectory() {
+            return isDirectory;
+        }
+
+        @Override
+        public Collection<String> childNames() {
+            return children.keySet();
+        }
+
+        @Override
+        public MetadataNode child(String name) {
+            return children.get(name);
+        }
+    }
 
     static {
-        DATA = new MetadataNodeManager.Data();
-        DATA.root().mkdirs("alpha", "beta", "gamma");
-        DATA.root().mkdirs("alpha", "theta");
-        DATA.root().mkdirs("foo", "a");
-        DATA.root().mkdirs("foo", "beta");
-        DATA.root().mkdirs("zeta").create("c");
-        DATA.root().mkdirs("zeta");
-        DATA.root().create("zzz");
+        DATA = new MetadataShellState();
+        DATA.setRoot(new TestNode("",
+            new TestNode("alpha",
+                new TestNode("beta",
+                    new TestNode("gamma")
+                ),
+                new TestNode("theta")
+            ),
+            new TestNode("foo",
+                new TestNode("a"),
+                new TestNode("beta")
+            ),
+            new TestNode("zeta",
+                new TestNode("c", false)
+            ),
+            new TestNode("zzz")
+        ));
         DATA.setWorkingDirectory("foo");
     }
 
@@ -75,9 +127,9 @@ public class GlobVisitorTest {
         visitor.accept(DATA);
         assertEquals(Optional.of(Arrays.asList(
             new MetadataNodeInfo(new String[] {"foo", "a"},
-                DATA.root().directory("foo").child("a")),
+                DATA.root().child("foo").child("a")),
             new MetadataNodeInfo(new String[] {"foo", "beta"},
-                DATA.root().directory("foo").child("beta")))), consumer.infos);
+                DATA.root().child("foo").child("beta")))), consumer.infos);
     }
 
     @Test
@@ -117,11 +169,11 @@ public class GlobVisitorTest {
         visitor.accept(DATA);
         assertEquals(Optional.of(Arrays.asList(
             new MetadataNodeInfo(new String[] {"alpha", "beta"},
-                DATA.root().directory("alpha").child("beta")),
+                DATA.root().child("alpha").child("beta")),
             new MetadataNodeInfo(new String[] {"alpha", "theta"},
-                DATA.root().directory("alpha").child("theta")),
+                DATA.root().child("alpha").child("theta")),
             new MetadataNodeInfo(new String[] {"foo", "beta"},
-                DATA.root().directory("foo").child("beta")))), consumer.infos);
+                DATA.root().child("foo").child("beta")))), consumer.infos);
     }
 
     @Test
@@ -139,6 +191,6 @@ public class GlobVisitorTest {
         visitor.accept(DATA);
         assertEquals(Optional.of(Arrays.asList(
             new MetadataNodeInfo(new String[] {"alpha"},
-                DATA.root().directory("alpha")))), consumer.infos);
+                DATA.root().child("alpha")))), consumer.infos);
     }
 }