You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rn...@apache.org on 2023/04/17 21:52:40 UTC

[kafka] branch trunk updated: KAFKA-14735: Improve KRaft metadata image change performance at high … (#13280)

This is an automated email from the ASF dual-hosted git repository.

rndgstn pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e27926f92b1 KAFKA-14735: Improve KRaft metadata image change performance at high … (#13280)
e27926f92b1 is described below

commit e27926f92b1f6b34ed6731f33c712a5d0d594275
Author: Ron Dagostino <rn...@gmail.com>
AuthorDate: Mon Apr 17 17:52:28 2023 -0400

    KAFKA-14735: Improve KRaft metadata image change performance at high … (#13280)
    
    topic counts.
    
    Introduces the use of persistent data structures in the KRaft metadata image to avoid copying the entire TopicsImage upon every change.  Performance that was O(<number of topics in the cluster>) is now O(<number of topics changing>), which has dramatic time and GC improvements for the most common topic-related metadata events.  We abstract away the chosen underlying persistent collection library via ImmutableMap<> and ImmutableSet<> interfaces and static factory methods.
    
    Reviewers: Luke Chen <sh...@gmail.com>, Colin P. McCabe <cm...@apache.org>, Ismael Juma <is...@juma.me.uk>, Purshotam Chauhan <pc...@confluent.io>
---
 LICENSE-binary                                     |   3 +-
 build.gradle                                       |  10 +
 checkstyle/import-control-jmh-benchmarks.xml       |   1 +
 checkstyle/import-control-metadata.xml             | 176 ++++++++++++
 checkstyle/import-control-server-common.xml        |  82 ++++++
 checkstyle/import-control.xml                      | 133 ---------
 .../metadata/BrokerMetadataPublisherTest.scala     |   6 +-
 gradle/dependencies.gradle                         |   2 +
 .../metadata/KRaftMetadataRequestBenchmark.java    | 235 ++++++++++++++++
 .../TopicsImageSingleRecordChangeBenchmark.java    |  90 ++++++
 .../metadata/TopicsImageSnapshotLoadBenchmark.java | 112 ++++++++
 .../metadata/TopicsImageZonalOutageBenchmark.java  |  99 +++++++
 licenses/pcollections-MIT                          |  24 ++
 .../java/org/apache/kafka/image/TopicsDelta.java   |  40 ++-
 .../java/org/apache/kafka/image/TopicsImage.java   |  32 ++-
 .../metrics/ControllerMetricsTestUtils.java        |   8 +-
 .../org/apache/kafka/image/TopicsImageTest.java    |  13 +-
 .../kafka/server/immutable/ImmutableMap.java       |  64 +++++
 .../kafka/server/immutable/ImmutableSet.java       |  60 ++++
 .../pcollections/PCollectionsImmutableMap.java     | 223 +++++++++++++++
 .../pcollections/PCollectionsImmutableSet.java     | 188 +++++++++++++
 .../kafka/server/immutable/DelegationChecker.java  | 146 ++++++++++
 .../pcollections/PCollectionsImmutableMapTest.java | 310 +++++++++++++++++++++
 .../pcollections/PCollectionsImmutableSetTest.java | 274 ++++++++++++++++++
 24 files changed, 2148 insertions(+), 183 deletions(-)

diff --git a/LICENSE-binary b/LICENSE-binary
index 842962e61ad..131d64b8633 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -313,7 +313,8 @@ argparse4j-0.7.0, see: licenses/argparse-MIT
 jopt-simple-5.0.4, see: licenses/jopt-simple-MIT
 slf4j-api-1.7.36, see: licenses/slf4j-MIT
 slf4j-reload4j-1.7.36, see: licenses/slf4j-MIT
-classgraph-4.8.138, see: license/classgraph-MIT
+classgraph-4.8.138, see: licenses/classgraph-MIT
+pcollections-4.0.1, see: licenses/pcollections-MIT
 
 ---------------------------------------
 BSD 2-Clause
diff --git a/build.gradle b/build.gradle
index a4ea31ceed5..773aa7a7769 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1222,6 +1222,10 @@ project(':metadata') {
   javadoc {
     enabled = false
   }
+
+  checkstyle {
+    configProperties = checkstyleConfigProperties("import-control-metadata.xml")
+  }
 }
 
 project(':group-coordinator') {
@@ -1554,11 +1558,13 @@ project(':server-common') {
     implementation libs.slf4jApi
     implementation libs.metrics
     implementation libs.joptSimple
+    implementation libs.pcollections
 
     testImplementation project(':clients')
     testImplementation project(':clients').sourceSets.test.output
     testImplementation libs.junitJupiter
     testImplementation libs.mockitoCore
+    testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc.
     testImplementation libs.hamcrest
 
     testRuntimeOnly libs.slf4jlog4j
@@ -1605,6 +1611,10 @@ project(':server-common') {
   clean.doFirst {
     delete "$buildDir/kafka/"
   }
+
+  checkstyle {
+    configProperties = checkstyleConfigProperties("import-control-server-common.xml")
+  }
 }
 
 project(':storage:api') {
diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml
index d6e966498d8..4cbd34f89cb 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -51,6 +51,7 @@
     <allow pkg="org.apache.kafka.storage"/>
     <allow pkg="org.apache.kafka.clients"/>
     <allow pkg="org.apache.kafka.coordinator.group"/>
+    <allow pkg="org.apache.kafka.image"/>
     <allow pkg="org.apache.kafka.metadata"/>
     <allow pkg="org.apache.kafka.timeline" />
 
diff --git a/checkstyle/import-control-metadata.xml b/checkstyle/import-control-metadata.xml
new file mode 100644
index 00000000000..ade866d6a07
--- /dev/null
+++ b/checkstyle/import-control-metadata.xml
@@ -0,0 +1,176 @@
+<!DOCTYPE import-control PUBLIC
+        "-//Puppy Crawl//DTD Import Control 1.1//EN"
+        "http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<import-control pkg="org.apache.kafka">
+
+    <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
+
+    <!-- common library dependencies -->
+    <allow pkg="java" />
+    <allow pkg="javax.management" />
+    <allow pkg="org.slf4j" />
+    <allow pkg="org.junit" />
+    <allow pkg="org.opentest4j" />
+    <allow pkg="org.hamcrest" />
+    <allow pkg="org.mockito" />
+    <allow pkg="org.easymock" />
+    <allow pkg="org.powermock" />
+    <allow pkg="java.security" />
+    <allow pkg="javax.net.ssl" />
+    <allow pkg="javax.security" />
+    <allow pkg="org.ietf.jgss" />
+    <allow pkg="net.jqwik.api" />
+
+    <!-- no one depends on the server -->
+    <disallow pkg="kafka" />
+
+    <!-- anyone can use public classes -->
+    <allow pkg="org.apache.kafka.common" exact-match="true" />
+    <allow pkg="org.apache.kafka.common.security" />
+    <allow pkg="org.apache.kafka.common.serialization" />
+    <allow pkg="org.apache.kafka.common.utils" />
+    <allow pkg="org.apache.kafka.common.errors" exact-match="true" />
+    <allow pkg="org.apache.kafka.common.memory" />
+
+    <!-- persistent collection factories/non-library-specific wrappers -->
+    <allow pkg="org.apache.kafka.server.immutable" exact-match="true" />
+
+    <subpackage name="common">
+        <subpackage name="metadata">
+            <allow pkg="com.fasterxml.jackson" />
+            <allow pkg="org.apache.kafka.common.protocol" />
+            <allow pkg="org.apache.kafka.common.protocol.types" />
+            <allow pkg="org.apache.kafka.common.message" />
+            <allow pkg="org.apache.kafka.common.metadata" />
+        </subpackage>
+    </subpackage>
+
+    <subpackage name="controller">
+        <allow pkg="com.yammer.metrics"/>
+        <allow pkg="org.apache.kafka.clients" />
+        <allow pkg="org.apache.kafka.clients.admin" />
+        <allow pkg="org.apache.kafka.common.acl" />
+        <allow pkg="org.apache.kafka.common.annotation" />
+        <allow pkg="org.apache.kafka.common.config" />
+        <allow pkg="org.apache.kafka.common.feature" />
+        <allow pkg="org.apache.kafka.common.internals" />
+        <allow pkg="org.apache.kafka.common.message" />
+        <allow pkg="org.apache.kafka.common.metadata" />
+        <allow pkg="org.apache.kafka.common.metrics" />
+        <allow pkg="org.apache.kafka.common.network" />
+        <allow pkg="org.apache.kafka.common.protocol" />
+        <allow pkg="org.apache.kafka.common.quota" />
+        <allow pkg="org.apache.kafka.common.requests" />
+        <allow pkg="org.apache.kafka.common.resource" />
+        <allow pkg="org.apache.kafka.controller" />
+        <allow pkg="org.apache.kafka.image" />
+        <allow pkg="org.apache.kafka.image.writer" />
+        <allow pkg="org.apache.kafka.metadata" />
+        <allow pkg="org.apache.kafka.metadata.authorizer" />
+        <allow pkg="org.apache.kafka.metadata.migration" />
+        <allow pkg="org.apache.kafka.metalog" />
+        <allow pkg="org.apache.kafka.queue" />
+        <allow pkg="org.apache.kafka.raft" />
+        <allow pkg="org.apache.kafka.server.authorizer" />
+        <allow pkg="org.apache.kafka.server.common" />
+        <allow pkg="org.apache.kafka.server.config" />
+        <allow pkg="org.apache.kafka.server.fault" />
+        <allow pkg="org.apache.kafka.server.metrics" />
+        <allow pkg="org.apache.kafka.server.policy"/>
+        <allow pkg="org.apache.kafka.server.util"/>
+        <allow pkg="org.apache.kafka.snapshot" />
+        <allow pkg="org.apache.kafka.test" />
+        <allow pkg="org.apache.kafka.timeline" />
+    </subpackage>
+
+    <subpackage name="image">
+        <allow pkg="org.apache.kafka.common.config" />
+        <allow pkg="org.apache.kafka.common.message" />
+        <allow pkg="org.apache.kafka.common.metadata" />
+        <allow pkg="org.apache.kafka.common.protocol" />
+        <allow pkg="org.apache.kafka.common.quota" />
+        <allow pkg="org.apache.kafka.common.record" />
+        <allow pkg="org.apache.kafka.common.requests" />
+        <allow pkg="org.apache.kafka.common.resource" />
+        <allow pkg="org.apache.kafka.image" />
+        <allow pkg="org.apache.kafka.image.writer" />
+        <allow pkg="org.apache.kafka.metadata" />
+        <allow pkg="org.apache.kafka.queue" />
+        <allow pkg="org.apache.kafka.clients.admin" />
+        <allow pkg="org.apache.kafka.raft" />
+        <allow pkg="org.apache.kafka.server.common" />
+        <allow pkg="org.apache.kafka.server.fault" />
+        <allow pkg="org.apache.kafka.server.util" />
+        <allow pkg="org.apache.kafka.snapshot" />
+        <allow pkg="org.apache.kafka.test" />
+    </subpackage>
+
+    <subpackage name="metadata">
+        <allow pkg="org.apache.kafka.clients" />
+        <allow pkg="org.apache.kafka.common.acl" />
+        <allow pkg="org.apache.kafka.common.annotation" />
+        <allow pkg="org.apache.kafka.common.config" />
+        <allow pkg="org.apache.kafka.common.message" />
+        <allow pkg="org.apache.kafka.common.metadata" />
+        <allow pkg="org.apache.kafka.common.protocol" />
+        <allow pkg="org.apache.kafka.common.record" />
+        <allow pkg="org.apache.kafka.common.resource" />
+        <allow pkg="org.apache.kafka.common.requests" />
+        <allow pkg="org.apache.kafka.image" />
+        <allow pkg="org.apache.kafka.metadata" />
+        <allow pkg="org.apache.kafka.metalog" />
+        <allow pkg="org.apache.kafka.queue" />
+        <allow pkg="org.apache.kafka.raft" />
+        <allow pkg="org.apache.kafka.server.authorizer" />
+        <allow pkg="org.apache.kafka.server.common" />
+        <allow pkg="org.apache.kafka.server.fault" />
+        <allow pkg="org.apache.kafka.server.config" />
+        <allow pkg="org.apache.kafka.server.util"/>
+        <allow pkg="org.apache.kafka.test" />
+        <subpackage name="authorizer">
+            <allow pkg="org.apache.kafka.common.acl" />
+            <allow pkg="org.apache.kafka.common.requests" />
+            <allow pkg="org.apache.kafka.common.resource" />
+            <allow pkg="org.apache.kafka.controller" />
+            <allow pkg="org.apache.kafka.metadata" />
+            <allow pkg="org.apache.kafka.common.internals" />
+        </subpackage>
+        <subpackage name="bootstrap">
+            <allow pkg="org.apache.kafka.snapshot" />
+        </subpackage>
+        <subpackage name="fault">
+            <allow pkg="org.apache.kafka.server.fault" />
+        </subpackage>
+    </subpackage>
+
+    <subpackage name="metalog">
+        <allow pkg="org.apache.kafka.common.metadata" />
+        <allow pkg="org.apache.kafka.common.protocol" />
+        <allow pkg="org.apache.kafka.common.record" />
+        <allow pkg="org.apache.kafka.metadata" />
+        <allow pkg="org.apache.kafka.metalog" />
+        <allow pkg="org.apache.kafka.raft" />
+        <allow pkg="org.apache.kafka.snapshot" />
+        <allow pkg="org.apache.kafka.queue" />
+        <allow pkg="org.apache.kafka.server.common" />
+        <allow pkg="org.apache.kafka.test" />
+    </subpackage>
+
+</import-control>
diff --git a/checkstyle/import-control-server-common.xml b/checkstyle/import-control-server-common.xml
new file mode 100644
index 00000000000..d310d81a832
--- /dev/null
+++ b/checkstyle/import-control-server-common.xml
@@ -0,0 +1,82 @@
+<!DOCTYPE import-control PUBLIC
+        "-//Puppy Crawl//DTD Import Control 1.1//EN"
+        "http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<import-control pkg="org.apache.kafka">
+
+    <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
+
+    <!-- common library dependencies -->
+    <allow pkg="java" />
+    <allow pkg="javax.management" />
+    <allow pkg="org.slf4j" />
+    <allow pkg="org.junit" />
+    <allow pkg="org.opentest4j" />
+    <allow pkg="org.hamcrest" />
+    <allow pkg="org.mockito" />
+    <allow pkg="org.easymock" />
+    <allow pkg="org.powermock" />
+    <allow pkg="java.security" />
+    <allow pkg="javax.net.ssl" />
+    <allow pkg="javax.security" />
+    <allow pkg="org.ietf.jgss" />
+    <allow pkg="net.jqwik.api" />
+
+    <!-- no one depends on the server -->
+    <disallow pkg="kafka" />
+
+    <!-- anyone can use public classes -->
+    <allow pkg="org.apache.kafka.common" exact-match="true" />
+    <allow pkg="org.apache.kafka.common.security" />
+    <allow pkg="org.apache.kafka.common.serialization" />
+    <allow pkg="org.apache.kafka.common.utils" />
+    <allow pkg="org.apache.kafka.common.errors" exact-match="true" />
+    <allow pkg="org.apache.kafka.common.memory" />
+
+    <!-- persistent collection factories/non-library-specific wrappers -->
+    <allow pkg="org.apache.kafka.server.immutable" exact-match="true" />
+
+    <subpackage name="queue">
+        <allow pkg="org.apache.kafka.test" />
+    </subpackage>
+
+    <subpackage name="server">
+        <allow pkg="org.apache.kafka.common" />
+        <allow pkg="joptsimple" />
+
+        <subpackage name="common">
+            <allow pkg="org.apache.kafka.server.common" />
+        </subpackage>
+
+        <subpackage name="immutable">
+            <allow pkg="org.apache.kafka.server.util"/>
+            <!-- only the factory package can use persistent collection library-specific wrapper implementations -->
+            <!-- the library-specific wrapper implementation for PCollections -->
+            <allow pkg="org.apache.kafka.server.immutable.pcollections" />
+            <subpackage name="pcollections">
+                <allow pkg="org.pcollections" />
+            </subpackage>
+        </subpackage>
+
+        <subpackage name="metrics">
+            <allow pkg="com.yammer.metrics" />
+        </subpackage>
+    </subpackage>
+
+</import-control>
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 72948e541d5..c3318807f20 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -88,14 +88,6 @@
       <allow pkg="org.apache.kafka.common.record" />
     </subpackage>
 
-    <subpackage name="metadata">
-      <allow pkg="com.fasterxml.jackson" />
-      <allow pkg="org.apache.kafka.common.protocol" />
-      <allow pkg="org.apache.kafka.common.protocol.types" />
-      <allow pkg="org.apache.kafka.common.message" />
-      <allow pkg="org.apache.kafka.common.metadata" />
-    </subpackage>
-
     <subpackage name="metrics">
       <allow pkg="org.apache.kafka.common.metrics" />
     </subpackage>
@@ -206,122 +198,6 @@
     </subpackage>
   </subpackage>
 
-  <subpackage name="controller">
-    <allow pkg="com.yammer.metrics"/>
-    <allow pkg="org.apache.kafka.clients" />
-    <allow pkg="org.apache.kafka.clients.admin" />
-    <allow pkg="org.apache.kafka.common.acl" />
-    <allow pkg="org.apache.kafka.common.annotation" />
-    <allow pkg="org.apache.kafka.common.config" />
-    <allow pkg="org.apache.kafka.common.feature" />
-    <allow pkg="org.apache.kafka.common.internals" />
-    <allow pkg="org.apache.kafka.common.message" />
-    <allow pkg="org.apache.kafka.common.metadata" />
-    <allow pkg="org.apache.kafka.common.metrics" />
-    <allow pkg="org.apache.kafka.common.network" />
-    <allow pkg="org.apache.kafka.common.protocol" />
-    <allow pkg="org.apache.kafka.common.quota" />
-    <allow pkg="org.apache.kafka.common.record" />
-    <allow pkg="org.apache.kafka.common.requests" />
-    <allow pkg="org.apache.kafka.common.resource" />
-    <allow pkg="org.apache.kafka.controller" />
-    <allow pkg="org.apache.kafka.image" />
-    <allow pkg="org.apache.kafka.image.writer" />
-    <allow pkg="org.apache.kafka.metadata" />
-    <allow pkg="org.apache.kafka.metadata.authorizer" />
-    <allow pkg="org.apache.kafka.metadata.migration" />
-    <allow pkg="org.apache.kafka.metalog" />
-    <allow pkg="org.apache.kafka.queue" />
-    <allow pkg="org.apache.kafka.raft" />
-    <allow pkg="org.apache.kafka.server.authorizer" />
-    <allow pkg="org.apache.kafka.server.common" />
-    <allow pkg="org.apache.kafka.server.config" />
-    <allow pkg="org.apache.kafka.server.fault" />
-    <allow pkg="org.apache.kafka.server.metrics" />
-    <allow pkg="org.apache.kafka.server.policy"/>
-    <allow pkg="org.apache.kafka.server.util"/>
-    <allow pkg="org.apache.kafka.snapshot" />
-    <allow pkg="org.apache.kafka.test" />
-    <allow pkg="org.apache.kafka.timeline" />
-  </subpackage>
-
-  <subpackage name="image">
-    <allow pkg="org.apache.kafka.common.config" />
-    <allow pkg="org.apache.kafka.common.message" />
-    <allow pkg="org.apache.kafka.common.metadata" />
-    <allow pkg="org.apache.kafka.common.protocol" />
-    <allow pkg="org.apache.kafka.common.quota" />
-    <allow pkg="org.apache.kafka.common.record" />
-    <allow pkg="org.apache.kafka.common.requests" />
-    <allow pkg="org.apache.kafka.common.resource" />
-    <allow pkg="org.apache.kafka.image" />
-    <allow pkg="org.apache.kafka.image.writer" />
-    <allow pkg="org.apache.kafka.metadata" />
-    <allow pkg="org.apache.kafka.queue" />
-    <allow pkg="org.apache.kafka.clients.admin" />
-    <allow pkg="org.apache.kafka.raft" />
-    <allow pkg="org.apache.kafka.server.common" />
-    <allow pkg="org.apache.kafka.server.fault" />
-    <allow pkg="org.apache.kafka.server.util" />
-    <allow pkg="org.apache.kafka.snapshot" />
-    <allow pkg="org.apache.kafka.test" />
-  </subpackage>
-
-  <subpackage name="metadata">
-    <allow pkg="org.apache.kafka.clients" />
-    <allow pkg="org.apache.kafka.common.acl" />
-    <allow pkg="org.apache.kafka.common.annotation" />
-    <allow pkg="org.apache.kafka.common.config" />
-    <allow pkg="org.apache.kafka.common.message" />
-    <allow pkg="org.apache.kafka.common.metadata" />
-    <allow pkg="org.apache.kafka.common.protocol" />
-    <allow pkg="org.apache.kafka.common.record" />
-    <allow pkg="org.apache.kafka.common.resource" />
-    <allow pkg="org.apache.kafka.common.requests" />
-    <allow pkg="org.apache.kafka.image" />
-    <allow pkg="org.apache.kafka.metadata" />
-    <allow pkg="org.apache.kafka.metalog" />
-    <allow pkg="org.apache.kafka.queue" />
-    <allow pkg="org.apache.kafka.raft" />
-    <allow pkg="org.apache.kafka.server.authorizer" />
-    <allow pkg="org.apache.kafka.server.common" />
-    <allow pkg="org.apache.kafka.server.fault" />
-    <allow pkg="org.apache.kafka.server.config" />
-    <allow pkg="org.apache.kafka.server.util"/>
-    <allow pkg="org.apache.kafka.test" />
-    <subpackage name="authorizer">
-      <allow pkg="org.apache.kafka.common.acl" />
-      <allow pkg="org.apache.kafka.common.requests" />
-      <allow pkg="org.apache.kafka.common.resource" />
-      <allow pkg="org.apache.kafka.controller" />
-      <allow pkg="org.apache.kafka.metadata" />
-      <allow pkg="org.apache.kafka.common.internals" />
-    </subpackage>
-    <subpackage name="bootstrap">
-      <allow pkg="org.apache.kafka.snapshot" />
-    </subpackage>
-    <subpackage name="fault">
-      <allow pkg="org.apache.kafka.server.fault" />
-    </subpackage>
-  </subpackage>
-
-  <subpackage name="metalog">
-    <allow pkg="org.apache.kafka.common.metadata" />
-    <allow pkg="org.apache.kafka.common.protocol" />
-    <allow pkg="org.apache.kafka.common.record" />
-    <allow pkg="org.apache.kafka.metadata" />
-    <allow pkg="org.apache.kafka.metalog" />
-    <allow pkg="org.apache.kafka.raft" />
-    <allow pkg="org.apache.kafka.snapshot" />
-    <allow pkg="org.apache.kafka.queue" />
-    <allow pkg="org.apache.kafka.server.common" />
-    <allow pkg="org.apache.kafka.test" />
-  </subpackage>
-
-  <subpackage name="queue">
-    <allow pkg="org.apache.kafka.test" />
-  </subpackage>
-
   <subpackage name="clients">
     <allow pkg="org.apache.kafka.common" />
     <allow pkg="org.apache.kafka.clients" exact-match="true"/>
@@ -358,19 +234,10 @@
 
   <subpackage name="server">
     <allow pkg="org.apache.kafka.common" />
-    <allow pkg="joptsimple" />
 
     <!-- This is required to make AlterConfigPolicyTest work. -->
     <allow pkg="org.apache.kafka.server.policy" />
 
-    <subpackage name="common">
-      <allow pkg="org.apache.kafka.server.common" />
-    </subpackage>
-
-    <subpackage name="metrics">
-      <allow pkg="com.yammer.metrics" />
-    </subpackage>
-
     <subpackage name="log">
       <allow pkg="com.fasterxml.jackson" />
       <allow pkg="kafka.api" />
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index ffe9b3f4076..d7d332ea7fb 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -177,9 +177,9 @@ class BrokerMetadataPublisherTest {
   private def topicsImage(
     topics: Seq[TopicImage]
   ): TopicsImage = {
-    val idsMap = topics.map(t => t.id -> t).toMap
-    val namesMap = topics.map(t => t.name -> t).toMap
-    new TopicsImage(idsMap.asJava, namesMap.asJava)
+    var retval = TopicsImage.EMPTY
+    topics.foreach { t => retval = retval.including(t) }
+    retval
   }
 
   private def newMockDynamicConfigPublisher(
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 40d408949db..23636469f71 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -108,6 +108,7 @@ versions += [
   metrics: "2.2.0",
   mockito: "4.9.0",
   netty: "4.1.86.Final",
+  pcollections: "4.0.1",
   powermock: "2.0.9",
   reflections: "0.9.12",
   reload4j: "1.2.19",
@@ -198,6 +199,7 @@ libs += [
   mockitoJunitJupiter: "org.mockito:mockito-junit-jupiter:$versions.mockito",
   nettyHandler: "io.netty:netty-handler:$versions.netty",
   nettyTransportNativeEpoll: "io.netty:netty-transport-native-epoll:$versions.netty",
+  pcollections: "org.pcollections:pcollections:$versions.pcollections",
   powermockJunit4: "org.powermock:powermock-module-junit4:$versions.powermock",
   powermockEasymock: "org.powermock:powermock-api-easymock:$versions.powermock",
   reflections: "org.reflections:reflections:$versions.reflections",
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
new file mode 100644
index 00000000000..8e997385fc3
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.metadata;
+
+import kafka.coordinator.transaction.TransactionCoordinator;
+import kafka.network.RequestChannel;
+import kafka.network.RequestConvertToJson;
+import kafka.server.AutoTopicCreationManager;
+import kafka.server.BrokerTopicStats;
+import kafka.server.ClientQuotaManager;
+import kafka.server.ClientRequestQuotaManager;
+import kafka.server.ControllerMutationQuotaManager;
+import kafka.server.FetchManager;
+import kafka.server.ForwardingManager;
+import kafka.server.KafkaApis;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.MetadataCache;
+import kafka.server.QuotaFactory;
+import kafka.server.RaftSupport;
+import kafka.server.ReplicaManager;
+import kafka.server.ReplicationQuotaManager;
+import kafka.server.SimpleApiVersionManager;
+import kafka.server.builders.KafkaApisBuilder;
+import kafka.server.metadata.KRaftMetadataCache;
+import kafka.server.metadata.MockConfigRepository;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataEndpoint;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.GroupCoordinator;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.mockito.Mockito;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import scala.Option;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+
+public class KRaftMetadataRequestBenchmark {
+    @Param({"500", "1000",  "5000"})
+    private int topicCount;
+    @Param({"10", "20", "50"})
+    private int partitionCount;
+
+    private RequestChannel requestChannel = Mockito.mock(RequestChannel.class, Mockito.withSettings().stubOnly());
+    private RequestChannel.Metrics requestChannelMetrics = Mockito.mock(RequestChannel.Metrics.class);
+    private ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+    private GroupCoordinator groupCoordinator = Mockito.mock(GroupCoordinator.class);
+    private TransactionCoordinator transactionCoordinator = Mockito.mock(TransactionCoordinator.class);
+    private AutoTopicCreationManager autoTopicCreationManager = Mockito.mock(AutoTopicCreationManager.class);
+    private Metrics metrics = new Metrics();
+    private int brokerId = 1;
+    private ForwardingManager forwardingManager = Mockito.mock(ForwardingManager.class);
+    private KRaftMetadataCache metadataCache = MetadataCache.kRaftMetadataCache(brokerId);
+    private ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class);
+    private ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class);
+    private ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class);
+    private ReplicationQuotaManager replicaQuotaManager = Mockito.mock(ReplicationQuotaManager.class);
+    private QuotaFactory.QuotaManagers quotaManagers = new QuotaFactory.QuotaManagers(clientQuotaManager,
+            clientQuotaManager, clientRequestQuotaManager, controllerMutationQuotaManager, replicaQuotaManager,
+            replicaQuotaManager, replicaQuotaManager, Option.empty());
+    private FetchManager fetchManager = Mockito.mock(FetchManager.class);
+    private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+    private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
+    private KafkaApis kafkaApis;
+    private RequestChannel.Request allTopicMetadataRequest;
+
+    @Setup(Level.Trial)
+    public void setup() {
+        initializeMetadataCache();
+        kafkaApis = createKafkaApis();
+        allTopicMetadataRequest = buildAllTopicMetadataRequest();
+    }
+
+    private void initializeMetadataCache() {
+        MetadataDelta buildupMetadataDelta = new MetadataDelta(MetadataImage.EMPTY);
+        IntStream.range(0, 5).forEach(brokerId -> {
+            RegisterBrokerRecord.BrokerEndpointCollection endpoints = new RegisterBrokerRecord.BrokerEndpointCollection();
+            endpoints(brokerId).forEach(endpoint ->
+                endpoints.add(new RegisterBrokerRecord.BrokerEndpoint().
+                    setHost(endpoint.host()).
+                    setPort(endpoint.port()).
+                    setName(endpoint.listener()).
+                    setSecurityProtocol(endpoint.securityProtocol())));
+            buildupMetadataDelta.replay(new RegisterBrokerRecord().
+                setBrokerId(brokerId).
+                setBrokerEpoch(100L).
+                setFenced(false).
+                setRack(null).
+                setEndPoints(endpoints).
+                setIncarnationId(Uuid.fromString(Uuid.randomUuid().toString())));
+        });
+        IntStream.range(0, topicCount).forEach(topicNum -> {
+            Uuid topicId = Uuid.randomUuid();
+            buildupMetadataDelta.replay(new TopicRecord().setName("topic-" + topicNum).setTopicId(topicId));
+            IntStream.range(0, partitionCount).forEach(partitionId ->
+                buildupMetadataDelta.replay(new PartitionRecord().
+                    setPartitionId(partitionId).
+                    setTopicId(topicId).
+                    setReplicas(Arrays.asList(0, 1, 3)).
+                    setIsr(Arrays.asList(0, 1, 3)).
+                    setRemovingReplicas(Collections.emptyList()).
+                    setAddingReplicas(Collections.emptyList()).
+                    setLeader(partitionCount % 5).
+                    setLeaderEpoch(0)));
+        });
+        metadataCache.setImage(buildupMetadataDelta.apply(MetadataProvenance.EMPTY));
+    }
+
+    private List<UpdateMetadataEndpoint> endpoints(final int brokerId) {
+        return Collections.singletonList(
+                new UpdateMetadataEndpoint()
+                        .setHost("host_" + brokerId)
+                        .setPort(9092)
+                        .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
+                        .setListener(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT).value()));
+    }
+
+    private KafkaApis createKafkaApis() {
+        Properties kafkaProps =  new Properties();
+        kafkaProps.put(KafkaConfig$.MODULE$.NodeIdProp(), brokerId + "");
+        kafkaProps.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "broker");
+        kafkaProps.put(KafkaConfig$.MODULE$.QuorumVotersProp(), "9000@foo:8092");
+        kafkaProps.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(), "CONTROLLER");
+        KafkaConfig config = new KafkaConfig(kafkaProps);
+        return new KafkaApisBuilder().
+                setRequestChannel(requestChannel).
+                setMetadataSupport(new RaftSupport(forwardingManager, metadataCache)).
+                setReplicaManager(replicaManager).
+                setGroupCoordinator(groupCoordinator).
+                setTxnCoordinator(transactionCoordinator).
+                setAutoTopicCreationManager(autoTopicCreationManager).
+                setBrokerId(brokerId).
+                setConfig(config).
+                setConfigRepository(new MockConfigRepository()).
+                setMetadataCache(metadataCache).
+                setMetrics(metrics).
+                setAuthorizer(Optional.empty()).
+                setQuotas(quotaManagers).
+                setFetchManager(fetchManager).
+                setBrokerTopicStats(brokerTopicStats).
+                setClusterId("clusterId").
+                setTime(Time.SYSTEM).
+                setTokenManager(null).
+                setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.BROKER, false)).
+                build();
+    }
+
+    @TearDown(Level.Trial)
+    public void tearDown() {
+        kafkaApis.close();
+        metrics.close();
+    }
+
+    private RequestChannel.Request buildAllTopicMetadataRequest() {
+        MetadataRequest metadataRequest = MetadataRequest.Builder.allTopics().build();
+        RequestHeader header = new RequestHeader(metadataRequest.apiKey(), metadataRequest.version(), "", 0);
+        ByteBuffer bodyBuffer = metadataRequest.serialize();
+
+        RequestContext context = new RequestContext(header, "1", null, principal,
+                ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+                SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false);
+        return new RequestChannel.Request(1, context, 0, MemoryPool.NONE, bodyBuffer, requestChannelMetrics, Option.empty());
+    }
+
+    @Benchmark
+    public void testMetadataRequestForAllTopics() {
+        kafkaApis.handleTopicMetadataRequest(allTopicMetadataRequest);
+    }
+
+    @Benchmark
+    public String testRequestToJson() {
+        return RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), allTopicMetadataRequest.requestLog(), allTopicMetadataRequest.isForwarded()).toString();
+    }
+
+    @Benchmark
+    public void testTopicIdInfo() {
+        metadataCache.topicIdInfo();
+    }
+}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSingleRecordChangeBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSingleRecordChangeBenchmark.java
new file mode 100644
index 00000000000..8f88a7a1e6f
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSingleRecordChangeBenchmark.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.metadata;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class TopicsImageSingleRecordChangeBenchmark {
+    @Param({"12500", "25000", "50000", "100000"})
+    private int totalTopicCount;
+    @Param({"10"})
+    private int partitionsPerTopic;
+    @Param({"3"})
+    private int replicationFactor;
+    @Param({"10000"})
+    private int numReplicasPerBroker;
+
+    private TopicsDelta topicsDelta;
+
+
+    @Setup(Level.Trial)
+    public void setup() {
+        // build an image containing all the specified topics and partitions
+        TopicsDelta buildupTopicsDelta = TopicsImageSnapshotLoadBenchmark.getInitialTopicsDelta(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
+        TopicsImage builtupTopicsImage = buildupTopicsDelta.apply();
+        // build a delta to apply within the benchmark code
+        // that adds a single topic-partition
+        topicsDelta = new TopicsDelta(builtupTopicsImage);
+        Uuid newTopicUuid = Uuid.randomUuid();
+        TopicRecord newTopicRecord = new TopicRecord().setName("newtopic").setTopicId(newTopicUuid);
+        topicsDelta.replay(newTopicRecord);
+        ArrayList<Integer> replicas = TopicsImageSnapshotLoadBenchmark.getReplicas(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker, 0);
+        ArrayList<Integer> isr = new ArrayList<>(replicas);
+        PartitionRecord newPartitionRecord = new PartitionRecord().
+            setPartitionId(0).
+            setTopicId(newTopicUuid).
+            setReplicas(replicas).
+            setIsr(isr).
+            setRemovingReplicas(Collections.emptyList()).
+            setAddingReplicas(Collections.emptyList()).
+            setLeader(0);
+        topicsDelta.replay(newPartitionRecord);
+        System.out.print("(Adding a single topic to metadata having " + totalTopicCount + " total topics) ");
+    }
+
+    @Benchmark
+    public void testTopicsDeltaSingleTopicAdd() {
+        topicsDelta.apply();
+    }
+}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSnapshotLoadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSnapshotLoadBenchmark.java
new file mode 100644
index 00000000000..10961d9d025
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSnapshotLoadBenchmark.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.metadata;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TopicsImageSnapshotLoadBenchmark {
+    @Param({"12500", "25000", "50000", "100000"})
+    private int totalTopicCount;
+    @Param({"10"})
+    private int partitionsPerTopic;
+    @Param({"3"})
+    private int replicationFactor;
+    @Param({"10000"})
+    private int numReplicasPerBroker;
+
+    private TopicsDelta topicsDelta;
+
+
+    @Setup(Level.Trial)
+    public void setup() {
+        // build a delta to apply within the benchmark code
+        // that consists of all the topics and partitions that would get loaded in a snapshot
+        topicsDelta = getInitialTopicsDelta(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
+        System.out.print("(Loading a snapshot containing " + totalTopicCount + " total topics) ");
+    }
+
+    static TopicsDelta getInitialTopicsDelta(int totalTopicCount, int partitionsPerTopic, int replicationFactor, int numReplicasPerBroker) {
+        int numBrokers = getNumBrokers(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
+        TopicsDelta buildupTopicsDelta = new TopicsDelta(TopicsImage.EMPTY);
+        final AtomicInteger currentLeader = new AtomicInteger(0);
+        IntStream.range(0, totalTopicCount).forEach(topicNumber -> {
+            Uuid topicId = Uuid.randomUuid();
+            buildupTopicsDelta.replay(new TopicRecord().setName("topic" + topicNumber).setTopicId(topicId));
+            IntStream.range(0, partitionsPerTopic).forEach(partitionNumber -> {
+                ArrayList<Integer> replicas = getReplicas(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker, currentLeader.get());
+                ArrayList<Integer> isr = new ArrayList<>(replicas);
+                buildupTopicsDelta.replay(new PartitionRecord().
+                    setPartitionId(partitionNumber).
+                    setTopicId(topicId).
+                    setReplicas(replicas).
+                    setIsr(isr).
+                    setRemovingReplicas(Collections.emptyList()).
+                    setAddingReplicas(Collections.emptyList()).
+                    setLeader(currentLeader.get()));
+                currentLeader.set((1 + currentLeader.get()) % numBrokers);
+            });
+        });
+        return buildupTopicsDelta;
+    }
+
+    static ArrayList<Integer> getReplicas(int totalTopicCount, int partitionsPerTopic, int replicationFactor, int numReplicasPerBroker, int currentLeader) {
+        ArrayList<Integer> replicas = new ArrayList<>();
+        int numBrokers = getNumBrokers(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
+        IntStream.range(0, replicationFactor).forEach(replicaNumber ->
+            replicas.add((replicaNumber + currentLeader) % numBrokers));
+        return replicas;
+    }
+
+    static int getNumBrokers(int totalTopicCount, int partitionsPerTopic, int replicationFactor, int numReplicasPerBroker) {
+        int numBrokers = totalTopicCount * partitionsPerTopic * replicationFactor / numReplicasPerBroker;
+        return numBrokers - numBrokers % 3;
+    }
+
+    @Benchmark
+    public void testTopicsDeltaSnapshotLoad() {
+        topicsDelta.apply();
+    }
+}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageZonalOutageBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageZonalOutageBenchmark.java
new file mode 100644
index 00000000000..5890763397a
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageZonalOutageBenchmark.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.metadata;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TopicsImageZonalOutageBenchmark {
+    @Param({"12500", "25000", "50000", "100000"})
+    private int totalTopicCount;
+    @Param({"10"})
+    private int partitionsPerTopic;
+    @Param({"3"})
+    private int replicationFactor;
+    @Param({"10000"})
+    private int numReplicasPerBroker;
+
+    private TopicsDelta topicsDelta;
+
+
+    @Setup(Level.Trial)
+    public void setup() {
+        // build an image containing all of the specified topics and partitions
+        TopicsDelta buildupTopicsDelta = TopicsImageSnapshotLoadBenchmark.getInitialTopicsDelta(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
+        TopicsImage builtupTopicsImage = buildupTopicsDelta.apply();
+        // build a delta to apply within the benchmark code
+        // that perturbs all the topic-partitions for broker 0
+        // (as might happen in a zonal outage, one broker at a time, ultimately across 1/3 of the brokers in the cluster).
+        // It turns out that
+        topicsDelta = new TopicsDelta(builtupTopicsImage);
+        Set<Uuid> perturbedTopics = new HashSet<>();
+        builtupTopicsImage.topicsById().forEach((topicId, topicImage) ->
+            topicImage.partitions().forEach((partitionNumber, partitionRegistration) -> {
+                List<Integer> newIsr = Arrays.stream(partitionRegistration.isr).boxed().filter(n -> n != 0).collect(Collectors.toList());
+                if (newIsr.size() < replicationFactor) {
+                    perturbedTopics.add(topicId);
+                    topicsDelta.replay(new PartitionRecord().
+                        setPartitionId(partitionNumber).
+                        setTopicId(topicId).
+                        setReplicas(Arrays.stream(partitionRegistration.replicas).boxed().collect(Collectors.toList())).
+                        setIsr(newIsr).
+                        setRemovingReplicas(Collections.emptyList()).
+                        setAddingReplicas(Collections.emptyList()).
+                        setLeader(newIsr.get(0)));
+                }
+            })
+        );
+        int numBrokers = TopicsImageSnapshotLoadBenchmark.getNumBrokers(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
+        System.out.print("(Perturbing 1 of " + numBrokers + " brokers, or " + perturbedTopics.size() + " topics within metadata having " + totalTopicCount + " total topics) ");
+    }
+
+    @Benchmark
+    public void testTopicsDeltaZonalOutage() {
+        topicsDelta.apply();
+    }
+}
diff --git a/licenses/pcollections-MIT b/licenses/pcollections-MIT
new file mode 100644
index 00000000000..50519c5e432
--- /dev/null
+++ b/licenses/pcollections-MIT
@@ -0,0 +1,24 @@
+MIT License
+
+Copyright 2008-2011, 2014-2020, 2022 Harold Cooper, gil cattaneo, Gleb Frank,
+Günther Grill, Ilya Gorbunov, Jirka Kremser, Jochen Theodorou, Johnny Lim,
+Liam Miller, Mark Perry, Matei Dragu, Mike Klein, Oleg Osipenko, Ran Ari-Gur,
+Shantanu Kumar, and Valeriy Vyrva.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
index d3c5888fa7a..3927e19191d 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
@@ -24,13 +24,13 @@ import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.metadata.RemoveTopicRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.immutable.ImmutableMap;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 
 
@@ -126,29 +126,27 @@ public final class TopicsDelta {
     }
 
     public TopicsImage apply() {
-        Map<Uuid, TopicImage> newTopicsById = new HashMap<>(image.topicsById().size());
-        Map<String, TopicImage> newTopicsByName = new HashMap<>(image.topicsByName().size());
-        for (Entry<Uuid, TopicImage> entry : image.topicsById().entrySet()) {
-            Uuid id = entry.getKey();
-            TopicImage prevTopicImage = entry.getValue();
-            TopicDelta delta = changedTopics.get(id);
-            if (delta == null) {
-                if (!deletedTopicIds.contains(id)) {
-                    newTopicsById.put(id, prevTopicImage);
-                    newTopicsByName.put(prevTopicImage.name(), prevTopicImage);
-                }
+        ImmutableMap<Uuid, TopicImage> newTopicsById = image.topicsById();
+        ImmutableMap<String, TopicImage> newTopicsByName = image.topicsByName();
+        // apply all the deletes
+        for (Uuid topicId: deletedTopicIds) {
+            // it was deleted, so we have to remove it from the maps
+            TopicImage originalTopicToBeDeleted = image.topicsById().get(topicId);
+            if (originalTopicToBeDeleted == null) {
+                throw new IllegalStateException("Missing topic id " + topicId);
             } else {
-                TopicImage newTopicImage = delta.apply();
-                newTopicsById.put(id, newTopicImage);
-                newTopicsByName.put(delta.name(), newTopicImage);
+                newTopicsById = newTopicsById.removed(topicId);
+                newTopicsByName = newTopicsByName.removed(originalTopicToBeDeleted.name());
             }
         }
-        for (Entry<Uuid, TopicDelta> entry : changedTopics.entrySet()) {
-            if (!newTopicsById.containsKey(entry.getKey())) {
-                TopicImage newTopicImage = entry.getValue().apply();
-                newTopicsById.put(newTopicImage.id(), newTopicImage);
-                newTopicsByName.put(newTopicImage.name(), newTopicImage);
-            }
+        // apply all the updates/additions
+        for (Map.Entry<Uuid, TopicDelta> entry: changedTopics.entrySet()) {
+            Uuid topicId = entry.getKey();
+            TopicImage newTopicToBeAddedOrUpdated = entry.getValue().apply();
+            // put new information into the maps
+            String topicName = newTopicToBeAddedOrUpdated.name();
+            newTopicsById = newTopicsById.updated(topicId, newTopicToBeAddedOrUpdated);
+            newTopicsByName = newTopicsByName.updated(topicName, newTopicToBeAddedOrUpdated);
         }
         return new TopicsImage(newTopicsById, newTopicsByName);
     }
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
index 5f7db112f0a..569264b1c4c 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
@@ -21,41 +21,45 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.image.writer.ImageWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
 import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.server.immutable.ImmutableMap;
 import org.apache.kafka.server.util.TranslatedValueMapView;
 
-import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
-
 /**
  * Represents the topics in the metadata image.
  *
  * This class is thread-safe.
  */
 public final class TopicsImage {
-    public static final TopicsImage EMPTY =
-        new TopicsImage(Collections.emptyMap(), Collections.emptyMap());
+    public static final TopicsImage EMPTY =  new TopicsImage(ImmutableMap.empty(), ImmutableMap.empty());
+
+    private final ImmutableMap<Uuid, TopicImage> topicsById;
+    private final ImmutableMap<String, TopicImage> topicsByName;
 
-    private final Map<Uuid, TopicImage> topicsById;
-    private final Map<String, TopicImage> topicsByName;
+    public TopicsImage(ImmutableMap<Uuid, TopicImage> topicsById,
+                       ImmutableMap<String, TopicImage> topicsByName) {
+        this.topicsById = topicsById;
+        this.topicsByName = topicsByName;
+    }
 
-    public TopicsImage(Map<Uuid, TopicImage> topicsById,
-                       Map<String, TopicImage> topicsByName) {
-        this.topicsById = Collections.unmodifiableMap(topicsById);
-        this.topicsByName = Collections.unmodifiableMap(topicsByName);
+    public TopicsImage including(TopicImage topic) {
+        return new TopicsImage(
+            this.topicsById.updated(topic.id(), topic),
+            this.topicsByName.updated(topic.name(), topic));
     }
 
     public boolean isEmpty() {
         return topicsById.isEmpty() && topicsByName.isEmpty();
     }
 
-    public Map<Uuid, TopicImage> topicsById() {
+    public ImmutableMap<Uuid, TopicImage> topicsById() {
         return topicsById;
     }
 
-    public Map<String, TopicImage> topicsByName() {
+    public ImmutableMap<String, TopicImage> topicsByName() {
         return topicsByName;
     }
 
@@ -74,8 +78,8 @@ public final class TopicsImage {
     }
 
     public void write(ImageWriter writer, ImageWriterOptions options) {
-        for (TopicImage topicImage : topicsById.values()) {
-            topicImage.write(writer, options);
+        for (Map.Entry<Uuid, TopicImage> entry : topicsById.entrySet()) {
+            entry.getValue().write(writer, options);
         }
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
index 7781bbdce9f..8be8548433a 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
@@ -99,12 +99,10 @@ public class ControllerMetricsTestUtils {
     public static TopicsImage fakeTopicsImage(
         TopicImage... topics
     ) {
-        Map<Uuid, TopicImage> topicsById = new HashMap<>();
-        Map<String, TopicImage> topicsByName = new HashMap<>();
+        TopicsImage image = TopicsImage.EMPTY;
         for (TopicImage topic : topics) {
-            topicsById.put(topic.id(), topic);
-            topicsByName.put(topic.name(), topic);
+            image = image.including(topic);
         }
-        return new TopicsImage(topicsById, topicsByName);
+        return image;
     }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
index 8268ec86091..11af3488bf5 100644
--- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.immutable.ImmutableMap;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -74,18 +75,18 @@ public class TopicsImageTest {
         return new TopicImage(name, id, partitionMap);
     }
 
-    private static Map<Uuid, TopicImage> newTopicsByIdMap(Collection<TopicImage> topics) {
-        Map<Uuid, TopicImage> map = new HashMap<>();
+    private static ImmutableMap<Uuid, TopicImage> newTopicsByIdMap(Collection<TopicImage> topics) {
+        ImmutableMap<Uuid, TopicImage> map = TopicsImage.EMPTY.topicsById();
         for (TopicImage topic : topics) {
-            map.put(topic.id(), topic);
+            map = map.updated(topic.id(), topic);
         }
         return map;
     }
 
-    private static Map<String, TopicImage> newTopicsByNameMap(Collection<TopicImage> topics) {
-        Map<String, TopicImage> map = new HashMap<>();
+    private static ImmutableMap<String, TopicImage> newTopicsByNameMap(Collection<TopicImage> topics) {
+        ImmutableMap<String, TopicImage> map = TopicsImage.EMPTY.topicsByName();
         for (TopicImage topic : topics) {
-            map.put(topic.name(), topic);
+            map = map.updated(topic.name(), topic);
         }
         return map;
     }
diff --git a/server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java b/server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java
new file mode 100644
index 00000000000..ec9790b3ff3
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import org.apache.kafka.server.immutable.pcollections.PCollectionsImmutableMap;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {
+    /**
+     * @return a wrapped hash-based persistent map that is empty
+     * @param <K> the key type
+     * @param <V> the value type
+     */
+    static <K, V> ImmutableMap<K, V> empty() {
+        return PCollectionsImmutableMap.empty();
+    }
+
+    /**
+     * @param key the key
+     * @param value the value
+     * @return a wrapped hash-based persistent map that has a single mapping
+     * @param <K> the key type
+     * @param <V> the value type
+     */
+    static <K, V> ImmutableMap<K, V> singleton(K key, V value) {
+        return PCollectionsImmutableMap.singleton(key, value);
+    }
+
+    /**
+     * @param key the key
+     * @param value the value
+     * @return a wrapped persistent map that differs from this one in that the given mapping is added (if necessary)
+     */
+    ImmutableMap<K, V> updated(K key, V value);
+
+    /**
+     * @param key the key
+     * @return a wrapped persistent map that differs from this one in that the given mapping is removed (if necessary)
+     */
+    ImmutableMap<K, V> removed(K key);
+}
diff --git a/server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableSet.java b/server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableSet.java
new file mode 100644
index 00000000000..6f2cfd015a9
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableSet.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import org.apache.kafka.server.immutable.pcollections.PCollectionsImmutableSet;
+
+import java.util.Set;
+
+/**
+ * A persistent Hash-based Set wrapper
+ * java.util.Set methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <E> the element type
+ */
+public interface ImmutableSet<E> extends Set<E> {
+
+    /**
+     * @return a wrapped hash-based persistent set that is empty
+     * @param <E> the element type
+     */
+    static <E> ImmutableSet<E> empty() {
+        return PCollectionsImmutableSet.empty();
+    }
+
+    /**
+     * @param e the element
+     * @return a wrapped hash-based persistent set that has a single element
+     * @param <E> the element type
+     */
+    static <E> ImmutableSet<E> singleton(E e) {
+        return PCollectionsImmutableSet.singleton(e);
+    }
+
+    /**
+     * @param e the element
+     * @return a wrapped persistent set that differs from this one in that the given element is added (if necessary)
+     */
+    ImmutableSet<E> added(E e);
+
+    /**
+     * @param e the element
+     * @return a wrapped persistent set that differs from this one in that the given element is added (if necessary)
+     */
+    ImmutableSet<E> removed(E e);
+}
diff --git a/server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMap.java b/server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMap.java
new file mode 100644
index 00000000000..d808f0db9c3
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMap.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable.pcollections;
+
+import org.apache.kafka.server.immutable.ImmutableMap;
+import org.pcollections.HashPMap;
+import org.pcollections.HashTreePMap;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+@SuppressWarnings("deprecation")
+public class PCollectionsImmutableMap<K, V> implements ImmutableMap<K, V> {
+
+    private final HashPMap<K, V> underlying;
+
+    /**
+     * @return a wrapped hash-based persistent map that is empty
+     * @param <K> the key type
+     * @param <V> the value type
+     */
+    public static <K, V> PCollectionsImmutableMap<K, V> empty() {
+        return new PCollectionsImmutableMap<>(HashTreePMap.empty());
+    }
+
+    /**
+     * @param key the key
+     * @param value the value
+     * @return a wrapped hash-based persistent map that has a single mapping
+     * @param <K> the key type
+     * @param <V> the value type
+     */
+    public static <K, V> PCollectionsImmutableMap<K, V> singleton(K key, V value) {
+        return new PCollectionsImmutableMap<>(HashTreePMap.singleton(key, value));
+    }
+
+    public PCollectionsImmutableMap(HashPMap<K, V> map) {
+        this.underlying = Objects.requireNonNull(map);
+    }
+
+    @Override
+    public ImmutableMap<K, V> updated(K key, V value) {
+        return new PCollectionsImmutableMap<>(underlying().plus(key, value));
+    }
+
+    @Override
+    public ImmutableMap<K, V> removed(K key) {
+        return new PCollectionsImmutableMap<>(underlying().minus(key));
+    }
+
+    @Override
+    public int size() {
+        return underlying().size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return underlying().isEmpty();
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+        return underlying().containsKey(key);
+    }
+
+    @Override
+    public boolean containsValue(Object value) {
+        return underlying().containsValue(value);
+    }
+
+    @Override
+    public V get(Object key) {
+        return underlying().get(key);
+    }
+
+    @Override
+    public V put(K key, V value) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().put(key, value);
+    }
+
+    @Override
+    public V remove(Object key) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().remove(key);
+    }
+
+    @Override
+    public void putAll(Map<? extends K, ? extends V> m) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        underlying().putAll(m);
+    }
+
+    @Override
+    public void clear() {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        underlying().clear();
+    }
+
+    @Override
+    public Set<K> keySet() {
+        return underlying().keySet();
+    }
+
+    @Override
+    public Collection<V> values() {
+        return underlying().values();
+    }
+
+    @Override
+    public Set<Entry<K, V>> entrySet() {
+        return underlying().entrySet();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        PCollectionsImmutableMap<?, ?> that = (PCollectionsImmutableMap<?, ?>) o;
+        return underlying().equals(that.underlying());
+    }
+
+    @Override
+    public int hashCode() {
+        return underlying().hashCode();
+    }
+
+    @Override
+    public V getOrDefault(Object key, V defaultValue) {
+        return underlying().getOrDefault(key, defaultValue);
+    }
+
+    @Override
+    public void forEach(BiConsumer<? super K, ? super V> action) {
+        underlying().forEach(action);
+    }
+
+    @Override
+    public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        underlying().replaceAll(function);
+    }
+
+    @Override
+    public V putIfAbsent(K key, V value) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().putIfAbsent(key, value);
+    }
+
+    @Override
+    public boolean remove(Object key, Object value) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().remove(key, value);
+    }
+
+    @Override
+    public boolean replace(K key, V oldValue, V newValue) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().replace(key, oldValue, newValue);
+    }
+
+    @Override
+    public V replace(K key, V value) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().replace(key, value);
+    }
+
+    @Override
+    public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().computeIfAbsent(key, mappingFunction);
+    }
+
+    @Override
+    public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().computeIfPresent(key, remappingFunction);
+    }
+
+    @Override
+    public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().compute(key, remappingFunction);
+    }
+
+    @Override
+    public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().merge(key, value, remappingFunction);
+    }
+
+    @Override
+    public String toString() {
+        return "PCollectionsImmutableMap{" +
+            "underlying=" + underlying() +
+            '}';
+    }
+
+    // package-private for testing
+    HashPMap<K, V> underlying() {
+        return underlying;
+    }
+}
diff --git a/server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSet.java b/server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSet.java
new file mode 100644
index 00000000000..8a50326ef1f
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSet.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable.pcollections;
+
+import org.apache.kafka.server.immutable.ImmutableSet;
+import org.pcollections.HashTreePSet;
+import org.pcollections.MapPSet;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Spliterator;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+@SuppressWarnings("deprecation")
+public class PCollectionsImmutableSet<E> implements ImmutableSet<E> {
+    private final MapPSet<E> underlying;
+
+    /**
+     * @return a wrapped hash-based persistent set that is empty
+     * @param <E> the element type
+     */
+    public static <E> PCollectionsImmutableSet<E> empty() {
+        return new PCollectionsImmutableSet<>(HashTreePSet.empty());
+    }
+
+    /**
+     * @param e the element
+     * @return a wrapped hash-based persistent set that has a single element
+     * @param <E> the element type
+     */
+    public static <E> PCollectionsImmutableSet<E> singleton(E e) {
+        return new PCollectionsImmutableSet<>(HashTreePSet.singleton(e));
+    }
+
+    public PCollectionsImmutableSet(MapPSet<E> set) {
+        this.underlying = Objects.requireNonNull(set);
+    }
+
+    @Override
+    public ImmutableSet<E> added(E e) {
+        return new PCollectionsImmutableSet<>(underlying().plus(e));
+    }
+    
+    @Override
+    public ImmutableSet<E> removed(E e) {
+        return new PCollectionsImmutableSet<>(underlying().minus(e));
+    }
+
+    @Override
+    public int size() {
+        return underlying().size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return underlying().isEmpty();
+    }
+
+    @Override
+    public boolean contains(Object o) {
+        return underlying().contains(o);
+    }
+
+    @Override
+    public Iterator<E> iterator() {
+        return underlying.iterator();
+    }
+
+    @Override
+    public void forEach(Consumer<? super E> action) {
+        underlying().forEach(action);
+    }
+
+    @Override
+    public Object[] toArray() {
+        return underlying().toArray();
+    }
+
+    @Override
+    public <T> T[] toArray(T[] a) {
+        return underlying().toArray(a);
+    }
+
+    @Override
+    public boolean add(E e) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().add(e);
+    }
+
+    @Override
+    public boolean remove(Object o) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().remove(o);
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> c) {
+        return underlying.containsAll(c);
+    }
+
+    @Override
+    public boolean addAll(Collection<? extends E> c) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().addAll(c);
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> c) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().retainAll(c);
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().removeAll(c);
+    }
+
+    @Override
+    public boolean removeIf(Predicate<? super E> filter) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().removeIf(filter);
+    }
+
+    @Override
+    public void clear() {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        underlying().clear();
+    }
+
+    @Override
+    public Spliterator<E> spliterator() {
+        return underlying().spliterator();
+    }
+
+    @Override
+    public Stream<E> stream() {
+        return underlying().stream();
+    }
+
+    @Override
+    public Stream<E> parallelStream() {
+        return underlying().parallelStream();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        PCollectionsImmutableSet<?> that = (PCollectionsImmutableSet<?>) o;
+        return Objects.equals(underlying(), that.underlying());
+    }
+
+    @Override
+    public int hashCode() {
+        return underlying().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "PCollectionsImmutableSet{" +
+            "underlying=" + underlying() +
+            '}';
+    }
+
+    // package-private for testing
+    MapPSet<E> underlying() {
+        return this.underlying;
+    }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/server/immutable/DelegationChecker.java b/server-common/src/test/java/org/apache/kafka/server/immutable/DelegationChecker.java
new file mode 100644
index 00000000000..e657e9b11c4
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/immutable/DelegationChecker.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import org.mockito.Mockito;
+
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+/**
+ * Facilitate testing of wrapper class delegation.
+ *
+ * We require the following things to test delegation:
+ *
+ * 1. A mock object to which the wrapper is expected to delegate method invocations
+ * 2. A way to define how the mock is expected to behave when its method is invoked
+ * 3. A way to define how to invoke the method on the wrapper
+ * 4. A way to test that the method on the mock is invoked correctly when the wrapper method is invoked
+ * 5. A way to test that any return value from the wrapper method is correct
+
+ * @param <D> delegate type
+ * @param <W> wrapper type
+ * @param <T> delegating method return type, if any
+ */
+public abstract class DelegationChecker<D, W, T> {
+    private final D mock;
+    private final W wrapper;
+    private Consumer<D> mockConsumer;
+    private Function<D, T> mockConfigurationFunction;
+    private T mockFunctionReturnValue;
+    private Consumer<W> wrapperConsumer;
+    private Function<W, T> wrapperFunctionApplier;
+    private Function<T, ?> mockFunctionReturnValueTransformation;
+    private boolean expectWrapperToWrapMockFunctionReturnValue;
+    private boolean persistentCollectionMethodInvokedCorrectly = false;
+
+    /**
+     * @param mock mock for the underlying delegate
+     * @param wrapperCreator how to create a wrapper for the mock
+     */
+    protected DelegationChecker(D mock, Function<D, W> wrapperCreator) {
+        this.mock = Objects.requireNonNull(mock);
+        this.wrapper = Objects.requireNonNull(wrapperCreator).apply(mock);
+    }
+
+    /**
+     * @param wrapper the wrapper
+     * @return the underlying delegate for the given wrapper
+     */
+    public abstract D unwrap(W wrapper);
+
+    public DelegationChecker<D, W, T> defineMockConfigurationForVoidMethodInvocation(Consumer<D> mockConsumer) {
+        this.mockConsumer = Objects.requireNonNull(mockConsumer);
+        return this;
+    }
+
+    public DelegationChecker<D, W, T> defineMockConfigurationForFunctionInvocation(Function<D, T> mockConfigurationFunction, T mockFunctionReturnValue) {
+        this.mockConfigurationFunction = Objects.requireNonNull(mockConfigurationFunction);
+        this.mockFunctionReturnValue = mockFunctionReturnValue;
+        return this;
+    }
+
+    public DelegationChecker<D, W, T> defineWrapperVoidMethodInvocation(Consumer<W> wrapperConsumer) {
+        this.wrapperConsumer = Objects.requireNonNull(wrapperConsumer);
+        return this;
+    }
+
+    public <R> DelegationChecker<D, W, T> defineWrapperFunctionInvocationAndMockReturnValueTransformation(
+        Function<W, T> wrapperFunctionApplier,
+        Function<T, R> expectedFunctionReturnValueTransformation) {
+        this.wrapperFunctionApplier = Objects.requireNonNull(wrapperFunctionApplier);
+        this.mockFunctionReturnValueTransformation = Objects.requireNonNull(expectedFunctionReturnValueTransformation);
+        return this;
+    }
+
+    public DelegationChecker<D, W, T> expectWrapperToWrapMockFunctionReturnValue() {
+        this.expectWrapperToWrapMockFunctionReturnValue = true;
+        return this;
+    }
+
+    public void doVoidMethodDelegationCheck() {
+        if (mockConsumer == null || wrapperConsumer == null ||
+            mockConfigurationFunction != null || wrapperFunctionApplier != null ||
+            mockFunctionReturnValue != null || mockFunctionReturnValueTransformation != null) {
+            throwExceptionForIllegalTestSetup();
+        }
+        // configure the mock to behave as desired
+        mockConsumer.accept(Mockito.doAnswer(invocation -> {
+            persistentCollectionMethodInvokedCorrectly = true;
+            return null;
+        }).when(mock));
+        // invoke the wrapper, which should invoke the mock as desired
+        wrapperConsumer.accept(wrapper);
+        // assert that the expected delegation to the mock actually occurred
+        assertTrue(persistentCollectionMethodInvokedCorrectly);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void doFunctionDelegationCheck() {
+        if (mockConfigurationFunction == null || wrapperFunctionApplier == null ||
+            mockFunctionReturnValueTransformation == null ||
+            mockConsumer != null || wrapperConsumer != null) {
+            throwExceptionForIllegalTestSetup();
+        }
+        // configure the mock to behave as desired
+        when(mockConfigurationFunction.apply(mock)).thenAnswer(invocation -> {
+            persistentCollectionMethodInvokedCorrectly = true;
+            return mockFunctionReturnValue;
+        });
+        // invoke the wrapper, which should invoke the mock as desired
+        T wrapperReturnValue = wrapperFunctionApplier.apply(wrapper);
+        // assert that the expected delegation to the mock actually occurred, including any return value transformation
+        assertTrue(persistentCollectionMethodInvokedCorrectly);
+        Object transformedMockFunctionReturnValue = mockFunctionReturnValueTransformation.apply(mockFunctionReturnValue);
+        if (this.expectWrapperToWrapMockFunctionReturnValue) {
+            assertEquals(transformedMockFunctionReturnValue, unwrap((W) wrapperReturnValue));
+        } else {
+            assertEquals(transformedMockFunctionReturnValue, wrapperReturnValue);
+        }
+    }
+
+    private static void throwExceptionForIllegalTestSetup() {
+        throw new IllegalStateException(
+            "test setup error: must define both mock and wrapper consumers or both mock and wrapper functions");
+    }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java b/server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java
new file mode 100644
index 00000000000..ab32b32be72
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable.pcollections;
+
+import org.apache.kafka.server.immutable.DelegationChecker;
+import org.apache.kafka.server.immutable.ImmutableMap;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.pcollections.HashPMap;
+import org.pcollections.HashTreePMap;
+
+import java.util.Collections;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static java.util.function.Function.identity;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+
+@SuppressWarnings({"unchecked", "deprecation"})
+public class PCollectionsImmutableMapTest {
+    private static final HashPMap<Object, Object> SINGLETON_MAP = HashTreePMap.singleton(new Object(), new Object());
+
+    private static final class PCollectionsHashMapWrapperDelegationChecker<R> extends DelegationChecker<HashPMap<Object, Object>, PCollectionsImmutableMap<Object, Object>, R> {
+        public PCollectionsHashMapWrapperDelegationChecker() {
+            super(mock(HashPMap.class), PCollectionsImmutableMap::new);
+        }
+
+        public HashPMap<Object, Object> unwrap(PCollectionsImmutableMap<Object, Object> wrapper) {
+            return wrapper.underlying();
+        }
+    }
+
+    @Test
+    public void testEmptyMap() {
+        Assertions.assertEquals(HashTreePMap.empty(), ((PCollectionsImmutableMap<?, ?>) ImmutableMap.empty()).underlying());
+    }
+
+    @Test
+    public void testSingletonMap() {
+        Assertions.assertEquals(HashTreePMap.singleton(1, 2), ((PCollectionsImmutableMap<?, ?>) ImmutableMap.singleton(1, 2)).underlying());
+    }
+
+    @Test
+    public void testUnderlying() {
+        assertSame(SINGLETON_MAP, new PCollectionsImmutableMap<>(SINGLETON_MAP).underlying());
+    }
+
+    @Test
+    public void testDelegationOfAfterAdding() {
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.plus(eq(this), eq(this)), SINGLETON_MAP)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.updated(this, this), identity())
+            .expectWrapperToWrapMockFunctionReturnValue()
+            .doFunctionDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfAfterRemoving() {
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.minus(eq(this)), SINGLETON_MAP)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removed(this), identity())
+            .expectWrapperToWrapMockFunctionReturnValue()
+            .doFunctionDelegationCheck();
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = {1, 2})
+    public void testDelegationOfSize(int mockFunctionReturnValue) {
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(HashPMap::size, mockFunctionReturnValue)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::size, identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testDelegationOfIsEmpty(boolean mockFunctionReturnValue) {
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(HashPMap::isEmpty, mockFunctionReturnValue)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::isEmpty, identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testDelegationOfContainsKey(boolean mockFunctionReturnValue) {
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.containsKey(eq(this)), mockFunctionReturnValue)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.containsKey(this), identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testDelegationOfContainsValue(boolean mockFunctionReturnValue) {
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.containsValue(eq(this)), mockFunctionReturnValue)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.containsValue(this), identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfGet() {
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.get(eq(this)), new Object())
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.get(this), identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfPut() {
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.put(eq(this), eq(this)), this)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.put(this, this), identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfRemoveByKey() {
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.remove(eq(this)), this)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.remove(this), identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfPutAll() {
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForVoidMethodInvocation(mock -> mock.putAll(eq(Collections.emptyMap())))
+            .defineWrapperVoidMethodInvocation(wrapper -> wrapper.putAll(Collections.emptyMap()))
+            .doVoidMethodDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfClear() {
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForVoidMethodInvocation(HashPMap::clear)
+            .defineWrapperVoidMethodInvocation(PCollectionsImmutableMap::clear)
+            .doVoidMethodDelegationCheck();
+    }
+
+
+    @Test
+    public void testDelegationOfKeySet() {
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(HashPMap::keySet, Collections.emptySet())
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::keySet, identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfValues() {
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(HashPMap::values, Collections.emptySet())
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::values, identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfEntrySet() {
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(HashPMap::entrySet, Collections.emptySet())
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::entrySet, identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @Test
+    public void testEquals() {
+        final HashPMap<Object, Object> mock = mock(HashPMap.class);
+        assertEquals(new PCollectionsImmutableMap<>(mock), new PCollectionsImmutableMap<>(mock));
+        final HashPMap<Object, Object> someOtherMock = mock(HashPMap.class);
+        assertNotEquals(new PCollectionsImmutableMap<>(mock), new PCollectionsImmutableMap<>(someOtherMock));
+    }
+
+    @Test
+    public void testHashCode() {
+        final HashPMap<Object, Object> mock = mock(HashPMap.class);
+        assertEquals(mock.hashCode(), new PCollectionsImmutableMap<>(mock).hashCode());
+        final HashPMap<Object, Object> someOtherMock = mock(HashPMap.class);
+        assertNotEquals(mock.hashCode(), new PCollectionsImmutableMap<>(someOtherMock).hashCode());
+    }
+
+    @Test
+    public void testDelegationOfGetOrDefault() {
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.getOrDefault(eq(this), eq(this)), this)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.getOrDefault(this, this), identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfForEach() {
+        final BiConsumer<Object, Object> mockBiConsumer = mock(BiConsumer.class);
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForVoidMethodInvocation(mock -> mock.forEach(eq(mockBiConsumer)))
+            .defineWrapperVoidMethodInvocation(wrapper -> wrapper.forEach(mockBiConsumer))
+            .doVoidMethodDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfReplaceAll() {
+        final BiFunction<Object, Object, Object> mockBiFunction = mock(BiFunction.class);
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForVoidMethodInvocation(mock -> mock.replaceAll(eq(mockBiFunction)))
+            .defineWrapperVoidMethodInvocation(wrapper -> wrapper.replaceAll(mockBiFunction))
+            .doVoidMethodDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfPutIfAbsent() {
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.putIfAbsent(eq(this), eq(this)), this)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.putIfAbsent(this, this), identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testDelegationOfRemoveByKeyAndValue(boolean mockFunctionReturnValue) {
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.remove(eq(this), eq(this)), mockFunctionReturnValue)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.remove(this, this), identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testDelegationOfReplaceWhenMappedToSpecificValue(boolean mockFunctionReturnValue) {
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.replace(eq(this), eq(this), eq(this)), mockFunctionReturnValue)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.replace(this, this, this), identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfReplaceWhenMappedToAnyValue() {
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.replace(eq(this), eq(this)), this)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.replace(this, this), identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfComputeIfAbsent() {
+        final Function<Object, Object> mockFunction = mock(Function.class);
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForVoidMethodInvocation(mock -> mock.computeIfAbsent(eq(this), eq(mockFunction)))
+            .defineWrapperVoidMethodInvocation(wrapper -> wrapper.computeIfAbsent(this, mockFunction))
+            .doVoidMethodDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfComputeIfPresent() {
+        final BiFunction<Object, Object, Object> mockBiFunction = mock(BiFunction.class);
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForVoidMethodInvocation(mock -> mock.computeIfPresent(eq(this), eq(mockBiFunction)))
+            .defineWrapperVoidMethodInvocation(wrapper -> wrapper.computeIfPresent(this, mockBiFunction))
+            .doVoidMethodDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfCompute() {
+        final BiFunction<Object, Object, Object> mockBiFunction = mock(BiFunction.class);
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForVoidMethodInvocation(mock -> mock.compute(eq(this), eq(mockBiFunction)))
+            .defineWrapperVoidMethodInvocation(wrapper -> wrapper.compute(this, mockBiFunction))
+            .doVoidMethodDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfMerge() {
+        final BiFunction<Object, Object, Object> mockBiFunction = mock(BiFunction.class);
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForVoidMethodInvocation(mock -> mock.merge(eq(this), eq(this), eq(mockBiFunction)))
+            .defineWrapperVoidMethodInvocation(wrapper -> wrapper.merge(this, this, mockBiFunction))
+            .doVoidMethodDelegationCheck();
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"a", "b"})
+    public void testDelegationOfToString(String mockFunctionReturnValue) {
+        new PCollectionsHashMapWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(HashPMap::toString, mockFunctionReturnValue)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::toString,
+                text -> "PCollectionsImmutableMap{underlying=" + text + "}")
+            .doFunctionDelegationCheck();
+    }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSetTest.java b/server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSetTest.java
new file mode 100644
index 00000000000..457488404f9
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSetTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable.pcollections;
+
+import org.apache.kafka.server.immutable.DelegationChecker;
+import org.apache.kafka.server.immutable.ImmutableSet;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.pcollections.HashTreePSet;
+import org.pcollections.MapPSet;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Spliterator;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+import static java.util.function.Function.identity;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+
+@SuppressWarnings({"unchecked", "deprecation"})
+public class PCollectionsImmutableSetTest {
+
+    private static final MapPSet<Object> SINGLETON_SET = HashTreePSet.singleton(new Object());
+
+    private static final class PCollectionsHashSetWrapperDelegationChecker<R> extends DelegationChecker<MapPSet<Object>, PCollectionsImmutableSet<Object>, R> {
+        public PCollectionsHashSetWrapperDelegationChecker() {
+            super(mock(MapPSet.class), PCollectionsImmutableSet::new);
+        }
+
+        public MapPSet<Object> unwrap(PCollectionsImmutableSet<Object> wrapper) {
+            return wrapper.underlying();
+        }
+    }
+
+    @Test
+    public void testEmptySet() {
+        Assertions.assertEquals(HashTreePSet.empty(), ((PCollectionsImmutableSet<?>) ImmutableSet.empty()).underlying());
+    }
+
+    @Test
+    public void testSingletonSet() {
+        Assertions.assertEquals(HashTreePSet.singleton(1), ((PCollectionsImmutableSet<?>) ImmutableSet.singleton(1)).underlying());
+    }
+
+    @Test
+    public void testUnderlying() {
+        assertSame(SINGLETON_SET, new PCollectionsImmutableSet<>(SINGLETON_SET).underlying());
+    }
+
+    @Test
+    public void testDelegationOfAfterAdding() {
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.plus(eq(this)), SINGLETON_SET)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.added(this), identity())
+            .expectWrapperToWrapMockFunctionReturnValue()
+            .doFunctionDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfAfterRemoving() {
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.minus(eq(this)), SINGLETON_SET)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removed(this), identity())
+            .expectWrapperToWrapMockFunctionReturnValue()
+            .doFunctionDelegationCheck();
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = {1, 2})
+    public void testDelegationOfSize(int mockFunctionReturnValue) {
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(MapPSet::size, mockFunctionReturnValue)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::size, identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testDelegationOfIsEmpty(boolean mockFunctionReturnValue) {
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(MapPSet::isEmpty, mockFunctionReturnValue)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::isEmpty, identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testDelegationOfContains(boolean mockFunctionReturnValue) {
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.contains(eq(this)), mockFunctionReturnValue)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.contains(this), identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfIterator() {
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(MapPSet::iterator, mock(Iterator.class))
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::iterator, identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfForEach() {
+        final Consumer<Object> mockConsumer = mock(Consumer.class);
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForVoidMethodInvocation(mock -> mock.forEach(eq(mockConsumer)))
+            .defineWrapperVoidMethodInvocation(wrapper -> wrapper.forEach(mockConsumer))
+            .doVoidMethodDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfToArray() {
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(MapPSet::toArray, new Object[0])
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::toArray, identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfToArrayIntoGivenDestination() {
+        Object[] destinationArray = new Object[0];
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.toArray(eq(destinationArray)), new Object[0])
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.toArray(destinationArray), identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testDelegationOfAdd(boolean mockFunctionReturnValue) {
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.add(eq(this)), mockFunctionReturnValue)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.add(this), identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testDelegationOfRemove(boolean mockFunctionReturnValue) {
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.remove(eq(this)), mockFunctionReturnValue)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.remove(this), identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testDelegationOfContainsAll(boolean mockFunctionReturnValue) {
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.containsAll(eq(Collections.emptyList())), mockFunctionReturnValue)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.containsAll(Collections.emptyList()), identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testDelegationOfAddAll(boolean mockFunctionReturnValue) {
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.addAll(eq(Collections.emptyList())), mockFunctionReturnValue)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.addAll(Collections.emptyList()), identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testDelegationOfRetainAll(boolean mockFunctionReturnValue) {
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.retainAll(eq(Collections.emptyList())), mockFunctionReturnValue)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.retainAll(Collections.emptyList()), identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testDelegationOfRemoveAll(boolean mockFunctionReturnValue) {
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.removeAll(eq(Collections.emptyList())), mockFunctionReturnValue)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removeAll(Collections.emptyList()), identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testDelegationOfRemoveIf(boolean mockFunctionReturnValue) {
+        final Predicate<Object> mockPredicate = mock(Predicate.class);
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(mock -> mock.removeIf(eq(mockPredicate)), mockFunctionReturnValue)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removeIf(mockPredicate), identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfClear() {
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForVoidMethodInvocation(MapPSet::clear)
+            .defineWrapperVoidMethodInvocation(PCollectionsImmutableSet::clear)
+            .doVoidMethodDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfSpliterator() {
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(MapPSet::spliterator, mock(Spliterator.class))
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::spliterator, identity())
+            .doFunctionDelegationCheck();
+    }
+
+
+    @Test
+    public void testDelegationOfStream() {
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(MapPSet::stream, mock(Stream.class))
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::stream, identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @Test
+    public void testDelegationOfParallelStream() {
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(MapPSet::parallelStream, mock(Stream.class))
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::parallelStream, identity())
+            .doFunctionDelegationCheck();
+    }
+
+    @Test
+    public void testEquals() {
+        final MapPSet<Object> mock = mock(MapPSet.class);
+        assertEquals(new PCollectionsImmutableSet<>(mock), new PCollectionsImmutableSet<>(mock));
+        final MapPSet<Object> someOtherMock = mock(MapPSet.class);
+        assertNotEquals(new PCollectionsImmutableSet<>(mock), new PCollectionsImmutableSet<>(someOtherMock));
+    }
+
+    @Test
+    public void testHashCode() {
+        final MapPSet<Object> mock = mock(MapPSet.class);
+        assertEquals(mock.hashCode(), new PCollectionsImmutableSet<>(mock).hashCode());
+        final MapPSet<Object> someOtherMock = mock(MapPSet.class);
+        assertNotEquals(mock.hashCode(), new PCollectionsImmutableSet<>(someOtherMock).hashCode());
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"a", "b"})
+    public void testDelegationOfToString(String mockFunctionReturnValue) {
+        new PCollectionsHashSetWrapperDelegationChecker<>()
+            .defineMockConfigurationForFunctionInvocation(MapPSet::toString, mockFunctionReturnValue)
+            .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::toString,
+                text -> "PCollectionsImmutableSet{underlying=" + text + "}")
+            .doFunctionDelegationCheck();
+    }
+}