You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rn...@apache.org on 2023/04/17 21:52:40 UTC
[kafka] branch trunk updated: KAFKA-14735: Improve KRaft metadata image change performance at high … (#13280)
This is an automated email from the ASF dual-hosted git repository.
rndgstn pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e27926f92b1 KAFKA-14735: Improve KRaft metadata image change performance at high … (#13280)
e27926f92b1 is described below
commit e27926f92b1f6b34ed6731f33c712a5d0d594275
Author: Ron Dagostino <rn...@gmail.com>
AuthorDate: Mon Apr 17 17:52:28 2023 -0400
KAFKA-14735: Improve KRaft metadata image change performance at high … (#13280)
topic counts.
Introduces the use of persistent data structures in the KRaft metadata image to avoid copying the entire TopicsImage upon every change. Performance that was O(<number of topics in the cluster>) is now O(<number of topics changing>), which has dramatic time and GC improvements for the most common topic-related metadata events. We abstract away the chosen underlying persistent collection library via ImmutableMap<> and ImmutableSet<> interfaces and static factory methods.
Reviewers: Luke Chen <sh...@gmail.com>, Colin P. McCabe <cm...@apache.org>, Ismael Juma <is...@juma.me.uk>, Purshotam Chauhan <pc...@confluent.io>
---
LICENSE-binary | 3 +-
build.gradle | 10 +
checkstyle/import-control-jmh-benchmarks.xml | 1 +
checkstyle/import-control-metadata.xml | 176 ++++++++++++
checkstyle/import-control-server-common.xml | 82 ++++++
checkstyle/import-control.xml | 133 ---------
.../metadata/BrokerMetadataPublisherTest.scala | 6 +-
gradle/dependencies.gradle | 2 +
.../metadata/KRaftMetadataRequestBenchmark.java | 235 ++++++++++++++++
.../TopicsImageSingleRecordChangeBenchmark.java | 90 ++++++
.../metadata/TopicsImageSnapshotLoadBenchmark.java | 112 ++++++++
.../metadata/TopicsImageZonalOutageBenchmark.java | 99 +++++++
licenses/pcollections-MIT | 24 ++
.../java/org/apache/kafka/image/TopicsDelta.java | 40 ++-
.../java/org/apache/kafka/image/TopicsImage.java | 32 ++-
.../metrics/ControllerMetricsTestUtils.java | 8 +-
.../org/apache/kafka/image/TopicsImageTest.java | 13 +-
.../kafka/server/immutable/ImmutableMap.java | 64 +++++
.../kafka/server/immutable/ImmutableSet.java | 60 ++++
.../pcollections/PCollectionsImmutableMap.java | 223 +++++++++++++++
.../pcollections/PCollectionsImmutableSet.java | 188 +++++++++++++
.../kafka/server/immutable/DelegationChecker.java | 146 ++++++++++
.../pcollections/PCollectionsImmutableMapTest.java | 310 +++++++++++++++++++++
.../pcollections/PCollectionsImmutableSetTest.java | 274 ++++++++++++++++++
24 files changed, 2148 insertions(+), 183 deletions(-)
diff --git a/LICENSE-binary b/LICENSE-binary
index 842962e61ad..131d64b8633 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -313,7 +313,8 @@ argparse4j-0.7.0, see: licenses/argparse-MIT
jopt-simple-5.0.4, see: licenses/jopt-simple-MIT
slf4j-api-1.7.36, see: licenses/slf4j-MIT
slf4j-reload4j-1.7.36, see: licenses/slf4j-MIT
-classgraph-4.8.138, see: license/classgraph-MIT
+classgraph-4.8.138, see: licenses/classgraph-MIT
+pcollections-4.0.1, see: licenses/pcollections-MIT
---------------------------------------
BSD 2-Clause
diff --git a/build.gradle b/build.gradle
index a4ea31ceed5..773aa7a7769 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1222,6 +1222,10 @@ project(':metadata') {
javadoc {
enabled = false
}
+
+ checkstyle {
+ configProperties = checkstyleConfigProperties("import-control-metadata.xml")
+ }
}
project(':group-coordinator') {
@@ -1554,11 +1558,13 @@ project(':server-common') {
implementation libs.slf4jApi
implementation libs.metrics
implementation libs.joptSimple
+ implementation libs.pcollections
testImplementation project(':clients')
testImplementation project(':clients').sourceSets.test.output
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
+ testImplementation libs.mockitoInline // supports mocking static methods, final classes, etc.
testImplementation libs.hamcrest
testRuntimeOnly libs.slf4jlog4j
@@ -1605,6 +1611,10 @@ project(':server-common') {
clean.doFirst {
delete "$buildDir/kafka/"
}
+
+ checkstyle {
+ configProperties = checkstyleConfigProperties("import-control-server-common.xml")
+ }
}
project(':storage:api') {
diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml
index d6e966498d8..4cbd34f89cb 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -51,6 +51,7 @@
<allow pkg="org.apache.kafka.storage"/>
<allow pkg="org.apache.kafka.clients"/>
<allow pkg="org.apache.kafka.coordinator.group"/>
+ <allow pkg="org.apache.kafka.image"/>
<allow pkg="org.apache.kafka.metadata"/>
<allow pkg="org.apache.kafka.timeline" />
diff --git a/checkstyle/import-control-metadata.xml b/checkstyle/import-control-metadata.xml
new file mode 100644
index 00000000000..ade866d6a07
--- /dev/null
+++ b/checkstyle/import-control-metadata.xml
@@ -0,0 +1,176 @@
+<!DOCTYPE import-control PUBLIC
+ "-//Puppy Crawl//DTD Import Control 1.1//EN"
+ "http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<import-control pkg="org.apache.kafka">
+
+ <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
+
+ <!-- common library dependencies -->
+ <allow pkg="java" />
+ <allow pkg="javax.management" />
+ <allow pkg="org.slf4j" />
+ <allow pkg="org.junit" />
+ <allow pkg="org.opentest4j" />
+ <allow pkg="org.hamcrest" />
+ <allow pkg="org.mockito" />
+ <allow pkg="org.easymock" />
+ <allow pkg="org.powermock" />
+ <allow pkg="java.security" />
+ <allow pkg="javax.net.ssl" />
+ <allow pkg="javax.security" />
+ <allow pkg="org.ietf.jgss" />
+ <allow pkg="net.jqwik.api" />
+
+ <!-- no one depends on the server -->
+ <disallow pkg="kafka" />
+
+ <!-- anyone can use public classes -->
+ <allow pkg="org.apache.kafka.common" exact-match="true" />
+ <allow pkg="org.apache.kafka.common.security" />
+ <allow pkg="org.apache.kafka.common.serialization" />
+ <allow pkg="org.apache.kafka.common.utils" />
+ <allow pkg="org.apache.kafka.common.errors" exact-match="true" />
+ <allow pkg="org.apache.kafka.common.memory" />
+
+ <!-- persistent collection factories/non-library-specific wrappers -->
+ <allow pkg="org.apache.kafka.server.immutable" exact-match="true" />
+
+ <subpackage name="common">
+ <subpackage name="metadata">
+ <allow pkg="com.fasterxml.jackson" />
+ <allow pkg="org.apache.kafka.common.protocol" />
+ <allow pkg="org.apache.kafka.common.protocol.types" />
+ <allow pkg="org.apache.kafka.common.message" />
+ <allow pkg="org.apache.kafka.common.metadata" />
+ </subpackage>
+ </subpackage>
+
+ <subpackage name="controller">
+ <allow pkg="com.yammer.metrics"/>
+ <allow pkg="org.apache.kafka.clients" />
+ <allow pkg="org.apache.kafka.clients.admin" />
+ <allow pkg="org.apache.kafka.common.acl" />
+ <allow pkg="org.apache.kafka.common.annotation" />
+ <allow pkg="org.apache.kafka.common.config" />
+ <allow pkg="org.apache.kafka.common.feature" />
+ <allow pkg="org.apache.kafka.common.internals" />
+ <allow pkg="org.apache.kafka.common.message" />
+ <allow pkg="org.apache.kafka.common.metadata" />
+ <allow pkg="org.apache.kafka.common.metrics" />
+ <allow pkg="org.apache.kafka.common.network" />
+ <allow pkg="org.apache.kafka.common.protocol" />
+ <allow pkg="org.apache.kafka.common.quota" />
+ <allow pkg="org.apache.kafka.common.requests" />
+ <allow pkg="org.apache.kafka.common.resource" />
+ <allow pkg="org.apache.kafka.controller" />
+ <allow pkg="org.apache.kafka.image" />
+ <allow pkg="org.apache.kafka.image.writer" />
+ <allow pkg="org.apache.kafka.metadata" />
+ <allow pkg="org.apache.kafka.metadata.authorizer" />
+ <allow pkg="org.apache.kafka.metadata.migration" />
+ <allow pkg="org.apache.kafka.metalog" />
+ <allow pkg="org.apache.kafka.queue" />
+ <allow pkg="org.apache.kafka.raft" />
+ <allow pkg="org.apache.kafka.server.authorizer" />
+ <allow pkg="org.apache.kafka.server.common" />
+ <allow pkg="org.apache.kafka.server.config" />
+ <allow pkg="org.apache.kafka.server.fault" />
+ <allow pkg="org.apache.kafka.server.metrics" />
+ <allow pkg="org.apache.kafka.server.policy"/>
+ <allow pkg="org.apache.kafka.server.util"/>
+ <allow pkg="org.apache.kafka.snapshot" />
+ <allow pkg="org.apache.kafka.test" />
+ <allow pkg="org.apache.kafka.timeline" />
+ </subpackage>
+
+ <subpackage name="image">
+ <allow pkg="org.apache.kafka.common.config" />
+ <allow pkg="org.apache.kafka.common.message" />
+ <allow pkg="org.apache.kafka.common.metadata" />
+ <allow pkg="org.apache.kafka.common.protocol" />
+ <allow pkg="org.apache.kafka.common.quota" />
+ <allow pkg="org.apache.kafka.common.record" />
+ <allow pkg="org.apache.kafka.common.requests" />
+ <allow pkg="org.apache.kafka.common.resource" />
+ <allow pkg="org.apache.kafka.image" />
+ <allow pkg="org.apache.kafka.image.writer" />
+ <allow pkg="org.apache.kafka.metadata" />
+ <allow pkg="org.apache.kafka.queue" />
+ <allow pkg="org.apache.kafka.clients.admin" />
+ <allow pkg="org.apache.kafka.raft" />
+ <allow pkg="org.apache.kafka.server.common" />
+ <allow pkg="org.apache.kafka.server.fault" />
+ <allow pkg="org.apache.kafka.server.util" />
+ <allow pkg="org.apache.kafka.snapshot" />
+ <allow pkg="org.apache.kafka.test" />
+ </subpackage>
+
+ <subpackage name="metadata">
+ <allow pkg="org.apache.kafka.clients" />
+ <allow pkg="org.apache.kafka.common.acl" />
+ <allow pkg="org.apache.kafka.common.annotation" />
+ <allow pkg="org.apache.kafka.common.config" />
+ <allow pkg="org.apache.kafka.common.message" />
+ <allow pkg="org.apache.kafka.common.metadata" />
+ <allow pkg="org.apache.kafka.common.protocol" />
+ <allow pkg="org.apache.kafka.common.record" />
+ <allow pkg="org.apache.kafka.common.resource" />
+ <allow pkg="org.apache.kafka.common.requests" />
+ <allow pkg="org.apache.kafka.image" />
+ <allow pkg="org.apache.kafka.metadata" />
+ <allow pkg="org.apache.kafka.metalog" />
+ <allow pkg="org.apache.kafka.queue" />
+ <allow pkg="org.apache.kafka.raft" />
+ <allow pkg="org.apache.kafka.server.authorizer" />
+ <allow pkg="org.apache.kafka.server.common" />
+ <allow pkg="org.apache.kafka.server.fault" />
+ <allow pkg="org.apache.kafka.server.config" />
+ <allow pkg="org.apache.kafka.server.util"/>
+ <allow pkg="org.apache.kafka.test" />
+ <subpackage name="authorizer">
+ <allow pkg="org.apache.kafka.common.acl" />
+ <allow pkg="org.apache.kafka.common.requests" />
+ <allow pkg="org.apache.kafka.common.resource" />
+ <allow pkg="org.apache.kafka.controller" />
+ <allow pkg="org.apache.kafka.metadata" />
+ <allow pkg="org.apache.kafka.common.internals" />
+ </subpackage>
+ <subpackage name="bootstrap">
+ <allow pkg="org.apache.kafka.snapshot" />
+ </subpackage>
+ <subpackage name="fault">
+ <allow pkg="org.apache.kafka.server.fault" />
+ </subpackage>
+ </subpackage>
+
+ <subpackage name="metalog">
+ <allow pkg="org.apache.kafka.common.metadata" />
+ <allow pkg="org.apache.kafka.common.protocol" />
+ <allow pkg="org.apache.kafka.common.record" />
+ <allow pkg="org.apache.kafka.metadata" />
+ <allow pkg="org.apache.kafka.metalog" />
+ <allow pkg="org.apache.kafka.raft" />
+ <allow pkg="org.apache.kafka.snapshot" />
+ <allow pkg="org.apache.kafka.queue" />
+ <allow pkg="org.apache.kafka.server.common" />
+ <allow pkg="org.apache.kafka.test" />
+ </subpackage>
+
+</import-control>
diff --git a/checkstyle/import-control-server-common.xml b/checkstyle/import-control-server-common.xml
new file mode 100644
index 00000000000..d310d81a832
--- /dev/null
+++ b/checkstyle/import-control-server-common.xml
@@ -0,0 +1,82 @@
+<!DOCTYPE import-control PUBLIC
+ "-//Puppy Crawl//DTD Import Control 1.1//EN"
+ "http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<import-control pkg="org.apache.kafka">
+
+ <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
+
+ <!-- common library dependencies -->
+ <allow pkg="java" />
+ <allow pkg="javax.management" />
+ <allow pkg="org.slf4j" />
+ <allow pkg="org.junit" />
+ <allow pkg="org.opentest4j" />
+ <allow pkg="org.hamcrest" />
+ <allow pkg="org.mockito" />
+ <allow pkg="org.easymock" />
+ <allow pkg="org.powermock" />
+ <allow pkg="java.security" />
+ <allow pkg="javax.net.ssl" />
+ <allow pkg="javax.security" />
+ <allow pkg="org.ietf.jgss" />
+ <allow pkg="net.jqwik.api" />
+
+ <!-- no one depends on the server -->
+ <disallow pkg="kafka" />
+
+ <!-- anyone can use public classes -->
+ <allow pkg="org.apache.kafka.common" exact-match="true" />
+ <allow pkg="org.apache.kafka.common.security" />
+ <allow pkg="org.apache.kafka.common.serialization" />
+ <allow pkg="org.apache.kafka.common.utils" />
+ <allow pkg="org.apache.kafka.common.errors" exact-match="true" />
+ <allow pkg="org.apache.kafka.common.memory" />
+
+ <!-- persistent collection factories/non-library-specific wrappers -->
+ <allow pkg="org.apache.kafka.server.immutable" exact-match="true" />
+
+ <subpackage name="queue">
+ <allow pkg="org.apache.kafka.test" />
+ </subpackage>
+
+ <subpackage name="server">
+ <allow pkg="org.apache.kafka.common" />
+ <allow pkg="joptsimple" />
+
+ <subpackage name="common">
+ <allow pkg="org.apache.kafka.server.common" />
+ </subpackage>
+
+ <subpackage name="immutable">
+ <allow pkg="org.apache.kafka.server.util"/>
+ <!-- only the factory package can use persistent collection library-specific wrapper implementations -->
+ <!-- the library-specific wrapper implementation for PCollections -->
+ <allow pkg="org.apache.kafka.server.immutable.pcollections" />
+ <subpackage name="pcollections">
+ <allow pkg="org.pcollections" />
+ </subpackage>
+ </subpackage>
+
+ <subpackage name="metrics">
+ <allow pkg="com.yammer.metrics" />
+ </subpackage>
+ </subpackage>
+
+</import-control>
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 72948e541d5..c3318807f20 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -88,14 +88,6 @@
<allow pkg="org.apache.kafka.common.record" />
</subpackage>
- <subpackage name="metadata">
- <allow pkg="com.fasterxml.jackson" />
- <allow pkg="org.apache.kafka.common.protocol" />
- <allow pkg="org.apache.kafka.common.protocol.types" />
- <allow pkg="org.apache.kafka.common.message" />
- <allow pkg="org.apache.kafka.common.metadata" />
- </subpackage>
-
<subpackage name="metrics">
<allow pkg="org.apache.kafka.common.metrics" />
</subpackage>
@@ -206,122 +198,6 @@
</subpackage>
</subpackage>
- <subpackage name="controller">
- <allow pkg="com.yammer.metrics"/>
- <allow pkg="org.apache.kafka.clients" />
- <allow pkg="org.apache.kafka.clients.admin" />
- <allow pkg="org.apache.kafka.common.acl" />
- <allow pkg="org.apache.kafka.common.annotation" />
- <allow pkg="org.apache.kafka.common.config" />
- <allow pkg="org.apache.kafka.common.feature" />
- <allow pkg="org.apache.kafka.common.internals" />
- <allow pkg="org.apache.kafka.common.message" />
- <allow pkg="org.apache.kafka.common.metadata" />
- <allow pkg="org.apache.kafka.common.metrics" />
- <allow pkg="org.apache.kafka.common.network" />
- <allow pkg="org.apache.kafka.common.protocol" />
- <allow pkg="org.apache.kafka.common.quota" />
- <allow pkg="org.apache.kafka.common.record" />
- <allow pkg="org.apache.kafka.common.requests" />
- <allow pkg="org.apache.kafka.common.resource" />
- <allow pkg="org.apache.kafka.controller" />
- <allow pkg="org.apache.kafka.image" />
- <allow pkg="org.apache.kafka.image.writer" />
- <allow pkg="org.apache.kafka.metadata" />
- <allow pkg="org.apache.kafka.metadata.authorizer" />
- <allow pkg="org.apache.kafka.metadata.migration" />
- <allow pkg="org.apache.kafka.metalog" />
- <allow pkg="org.apache.kafka.queue" />
- <allow pkg="org.apache.kafka.raft" />
- <allow pkg="org.apache.kafka.server.authorizer" />
- <allow pkg="org.apache.kafka.server.common" />
- <allow pkg="org.apache.kafka.server.config" />
- <allow pkg="org.apache.kafka.server.fault" />
- <allow pkg="org.apache.kafka.server.metrics" />
- <allow pkg="org.apache.kafka.server.policy"/>
- <allow pkg="org.apache.kafka.server.util"/>
- <allow pkg="org.apache.kafka.snapshot" />
- <allow pkg="org.apache.kafka.test" />
- <allow pkg="org.apache.kafka.timeline" />
- </subpackage>
-
- <subpackage name="image">
- <allow pkg="org.apache.kafka.common.config" />
- <allow pkg="org.apache.kafka.common.message" />
- <allow pkg="org.apache.kafka.common.metadata" />
- <allow pkg="org.apache.kafka.common.protocol" />
- <allow pkg="org.apache.kafka.common.quota" />
- <allow pkg="org.apache.kafka.common.record" />
- <allow pkg="org.apache.kafka.common.requests" />
- <allow pkg="org.apache.kafka.common.resource" />
- <allow pkg="org.apache.kafka.image" />
- <allow pkg="org.apache.kafka.image.writer" />
- <allow pkg="org.apache.kafka.metadata" />
- <allow pkg="org.apache.kafka.queue" />
- <allow pkg="org.apache.kafka.clients.admin" />
- <allow pkg="org.apache.kafka.raft" />
- <allow pkg="org.apache.kafka.server.common" />
- <allow pkg="org.apache.kafka.server.fault" />
- <allow pkg="org.apache.kafka.server.util" />
- <allow pkg="org.apache.kafka.snapshot" />
- <allow pkg="org.apache.kafka.test" />
- </subpackage>
-
- <subpackage name="metadata">
- <allow pkg="org.apache.kafka.clients" />
- <allow pkg="org.apache.kafka.common.acl" />
- <allow pkg="org.apache.kafka.common.annotation" />
- <allow pkg="org.apache.kafka.common.config" />
- <allow pkg="org.apache.kafka.common.message" />
- <allow pkg="org.apache.kafka.common.metadata" />
- <allow pkg="org.apache.kafka.common.protocol" />
- <allow pkg="org.apache.kafka.common.record" />
- <allow pkg="org.apache.kafka.common.resource" />
- <allow pkg="org.apache.kafka.common.requests" />
- <allow pkg="org.apache.kafka.image" />
- <allow pkg="org.apache.kafka.metadata" />
- <allow pkg="org.apache.kafka.metalog" />
- <allow pkg="org.apache.kafka.queue" />
- <allow pkg="org.apache.kafka.raft" />
- <allow pkg="org.apache.kafka.server.authorizer" />
- <allow pkg="org.apache.kafka.server.common" />
- <allow pkg="org.apache.kafka.server.fault" />
- <allow pkg="org.apache.kafka.server.config" />
- <allow pkg="org.apache.kafka.server.util"/>
- <allow pkg="org.apache.kafka.test" />
- <subpackage name="authorizer">
- <allow pkg="org.apache.kafka.common.acl" />
- <allow pkg="org.apache.kafka.common.requests" />
- <allow pkg="org.apache.kafka.common.resource" />
- <allow pkg="org.apache.kafka.controller" />
- <allow pkg="org.apache.kafka.metadata" />
- <allow pkg="org.apache.kafka.common.internals" />
- </subpackage>
- <subpackage name="bootstrap">
- <allow pkg="org.apache.kafka.snapshot" />
- </subpackage>
- <subpackage name="fault">
- <allow pkg="org.apache.kafka.server.fault" />
- </subpackage>
- </subpackage>
-
- <subpackage name="metalog">
- <allow pkg="org.apache.kafka.common.metadata" />
- <allow pkg="org.apache.kafka.common.protocol" />
- <allow pkg="org.apache.kafka.common.record" />
- <allow pkg="org.apache.kafka.metadata" />
- <allow pkg="org.apache.kafka.metalog" />
- <allow pkg="org.apache.kafka.raft" />
- <allow pkg="org.apache.kafka.snapshot" />
- <allow pkg="org.apache.kafka.queue" />
- <allow pkg="org.apache.kafka.server.common" />
- <allow pkg="org.apache.kafka.test" />
- </subpackage>
-
- <subpackage name="queue">
- <allow pkg="org.apache.kafka.test" />
- </subpackage>
-
<subpackage name="clients">
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.clients" exact-match="true"/>
@@ -358,19 +234,10 @@
<subpackage name="server">
<allow pkg="org.apache.kafka.common" />
- <allow pkg="joptsimple" />
<!-- This is required to make AlterConfigPolicyTest work. -->
<allow pkg="org.apache.kafka.server.policy" />
- <subpackage name="common">
- <allow pkg="org.apache.kafka.server.common" />
- </subpackage>
-
- <subpackage name="metrics">
- <allow pkg="com.yammer.metrics" />
- </subpackage>
-
<subpackage name="log">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="kafka.api" />
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index ffe9b3f4076..d7d332ea7fb 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -177,9 +177,9 @@ class BrokerMetadataPublisherTest {
private def topicsImage(
topics: Seq[TopicImage]
): TopicsImage = {
- val idsMap = topics.map(t => t.id -> t).toMap
- val namesMap = topics.map(t => t.name -> t).toMap
- new TopicsImage(idsMap.asJava, namesMap.asJava)
+ var retval = TopicsImage.EMPTY
+ topics.foreach { t => retval = retval.including(t) }
+ retval
}
private def newMockDynamicConfigPublisher(
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 40d408949db..23636469f71 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -108,6 +108,7 @@ versions += [
metrics: "2.2.0",
mockito: "4.9.0",
netty: "4.1.86.Final",
+ pcollections: "4.0.1",
powermock: "2.0.9",
reflections: "0.9.12",
reload4j: "1.2.19",
@@ -198,6 +199,7 @@ libs += [
mockitoJunitJupiter: "org.mockito:mockito-junit-jupiter:$versions.mockito",
nettyHandler: "io.netty:netty-handler:$versions.netty",
nettyTransportNativeEpoll: "io.netty:netty-transport-native-epoll:$versions.netty",
+ pcollections: "org.pcollections:pcollections:$versions.pcollections",
powermockJunit4: "org.powermock:powermock-module-junit4:$versions.powermock",
powermockEasymock: "org.powermock:powermock-api-easymock:$versions.powermock",
reflections: "org.reflections:reflections:$versions.reflections",
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
new file mode 100644
index 00000000000..8e997385fc3
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.metadata;
+
+import kafka.coordinator.transaction.TransactionCoordinator;
+import kafka.network.RequestChannel;
+import kafka.network.RequestConvertToJson;
+import kafka.server.AutoTopicCreationManager;
+import kafka.server.BrokerTopicStats;
+import kafka.server.ClientQuotaManager;
+import kafka.server.ClientRequestQuotaManager;
+import kafka.server.ControllerMutationQuotaManager;
+import kafka.server.FetchManager;
+import kafka.server.ForwardingManager;
+import kafka.server.KafkaApis;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.MetadataCache;
+import kafka.server.QuotaFactory;
+import kafka.server.RaftSupport;
+import kafka.server.ReplicaManager;
+import kafka.server.ReplicationQuotaManager;
+import kafka.server.SimpleApiVersionManager;
+import kafka.server.builders.KafkaApisBuilder;
+import kafka.server.metadata.KRaftMetadataCache;
+import kafka.server.metadata.MockConfigRepository;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataEndpoint;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.group.GroupCoordinator;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.mockito.Mockito;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import scala.Option;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+
+public class KRaftMetadataRequestBenchmark {
+ @Param({"500", "1000", "5000"})
+ private int topicCount;
+ @Param({"10", "20", "50"})
+ private int partitionCount;
+
+ private RequestChannel requestChannel = Mockito.mock(RequestChannel.class, Mockito.withSettings().stubOnly());
+ private RequestChannel.Metrics requestChannelMetrics = Mockito.mock(RequestChannel.Metrics.class);
+ private ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
+ private GroupCoordinator groupCoordinator = Mockito.mock(GroupCoordinator.class);
+ private TransactionCoordinator transactionCoordinator = Mockito.mock(TransactionCoordinator.class);
+ private AutoTopicCreationManager autoTopicCreationManager = Mockito.mock(AutoTopicCreationManager.class);
+ private Metrics metrics = new Metrics();
+ private int brokerId = 1;
+ private ForwardingManager forwardingManager = Mockito.mock(ForwardingManager.class);
+ private KRaftMetadataCache metadataCache = MetadataCache.kRaftMetadataCache(brokerId);
+ private ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class);
+ private ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class);
+ private ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class);
+ private ReplicationQuotaManager replicaQuotaManager = Mockito.mock(ReplicationQuotaManager.class);
+ private QuotaFactory.QuotaManagers quotaManagers = new QuotaFactory.QuotaManagers(clientQuotaManager,
+ clientQuotaManager, clientRequestQuotaManager, controllerMutationQuotaManager, replicaQuotaManager,
+ replicaQuotaManager, replicaQuotaManager, Option.empty());
+ private FetchManager fetchManager = Mockito.mock(FetchManager.class);
+ private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
+ private KafkaPrincipal principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
+ private KafkaApis kafkaApis;
+ private RequestChannel.Request allTopicMetadataRequest;
+
+ @Setup(Level.Trial)
+ public void setup() {
+ initializeMetadataCache();
+ kafkaApis = createKafkaApis();
+ allTopicMetadataRequest = buildAllTopicMetadataRequest();
+ }
+
+ private void initializeMetadataCache() {
+ MetadataDelta buildupMetadataDelta = new MetadataDelta(MetadataImage.EMPTY);
+ IntStream.range(0, 5).forEach(brokerId -> {
+ RegisterBrokerRecord.BrokerEndpointCollection endpoints = new RegisterBrokerRecord.BrokerEndpointCollection();
+ endpoints(brokerId).forEach(endpoint ->
+ endpoints.add(new RegisterBrokerRecord.BrokerEndpoint().
+ setHost(endpoint.host()).
+ setPort(endpoint.port()).
+ setName(endpoint.listener()).
+ setSecurityProtocol(endpoint.securityProtocol())));
+ buildupMetadataDelta.replay(new RegisterBrokerRecord().
+ setBrokerId(brokerId).
+ setBrokerEpoch(100L).
+ setFenced(false).
+ setRack(null).
+ setEndPoints(endpoints).
+ setIncarnationId(Uuid.fromString(Uuid.randomUuid().toString())));
+ });
+ IntStream.range(0, topicCount).forEach(topicNum -> {
+ Uuid topicId = Uuid.randomUuid();
+ buildupMetadataDelta.replay(new TopicRecord().setName("topic-" + topicNum).setTopicId(topicId));
+ IntStream.range(0, partitionCount).forEach(partitionId ->
+ buildupMetadataDelta.replay(new PartitionRecord().
+ setPartitionId(partitionId).
+ setTopicId(topicId).
+ setReplicas(Arrays.asList(0, 1, 3)).
+ setIsr(Arrays.asList(0, 1, 3)).
+ setRemovingReplicas(Collections.emptyList()).
+ setAddingReplicas(Collections.emptyList()).
+ setLeader(partitionCount % 5).
+ setLeaderEpoch(0)));
+ });
+ metadataCache.setImage(buildupMetadataDelta.apply(MetadataProvenance.EMPTY));
+ }
+
+ private List<UpdateMetadataEndpoint> endpoints(final int brokerId) {
+ return Collections.singletonList(
+ new UpdateMetadataEndpoint()
+ .setHost("host_" + brokerId)
+ .setPort(9092)
+ .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
+ .setListener(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT).value()));
+ }
+
+ private KafkaApis createKafkaApis() {
+ Properties kafkaProps = new Properties();
+ kafkaProps.put(KafkaConfig$.MODULE$.NodeIdProp(), brokerId + "");
+ kafkaProps.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "broker");
+ kafkaProps.put(KafkaConfig$.MODULE$.QuorumVotersProp(), "9000@foo:8092");
+ kafkaProps.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(), "CONTROLLER");
+ KafkaConfig config = new KafkaConfig(kafkaProps);
+ return new KafkaApisBuilder().
+ setRequestChannel(requestChannel).
+ setMetadataSupport(new RaftSupport(forwardingManager, metadataCache)).
+ setReplicaManager(replicaManager).
+ setGroupCoordinator(groupCoordinator).
+ setTxnCoordinator(transactionCoordinator).
+ setAutoTopicCreationManager(autoTopicCreationManager).
+ setBrokerId(brokerId).
+ setConfig(config).
+ setConfigRepository(new MockConfigRepository()).
+ setMetadataCache(metadataCache).
+ setMetrics(metrics).
+ setAuthorizer(Optional.empty()).
+ setQuotas(quotaManagers).
+ setFetchManager(fetchManager).
+ setBrokerTopicStats(brokerTopicStats).
+ setClusterId("clusterId").
+ setTime(Time.SYSTEM).
+ setTokenManager(null).
+ setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.BROKER, false)).
+ build();
+ }
+
+ @TearDown(Level.Trial)
+ public void tearDown() {
+ kafkaApis.close();
+ metrics.close();
+ }
+
+ private RequestChannel.Request buildAllTopicMetadataRequest() {
+ MetadataRequest metadataRequest = MetadataRequest.Builder.allTopics().build();
+ RequestHeader header = new RequestHeader(metadataRequest.apiKey(), metadataRequest.version(), "", 0);
+ ByteBuffer bodyBuffer = metadataRequest.serialize();
+
+ RequestContext context = new RequestContext(header, "1", null, principal,
+ ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+ SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false);
+ return new RequestChannel.Request(1, context, 0, MemoryPool.NONE, bodyBuffer, requestChannelMetrics, Option.empty());
+ }
+
+ @Benchmark
+ public void testMetadataRequestForAllTopics() {
+ kafkaApis.handleTopicMetadataRequest(allTopicMetadataRequest);
+ }
+
+ @Benchmark
+ public String testRequestToJson() {
+ return RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), allTopicMetadataRequest.requestLog(), allTopicMetadataRequest.isForwarded()).toString();
+ }
+
+ @Benchmark
+ public void testTopicIdInfo() {
+ metadataCache.topicIdInfo();
+ }
+}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSingleRecordChangeBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSingleRecordChangeBenchmark.java
new file mode 100644
index 00000000000..8f88a7a1e6f
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSingleRecordChangeBenchmark.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.metadata;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class TopicsImageSingleRecordChangeBenchmark {
+ @Param({"12500", "25000", "50000", "100000"})
+ private int totalTopicCount;
+ @Param({"10"})
+ private int partitionsPerTopic;
+ @Param({"3"})
+ private int replicationFactor;
+ @Param({"10000"})
+ private int numReplicasPerBroker;
+
+ private TopicsDelta topicsDelta;
+
+
+ @Setup(Level.Trial)
+ public void setup() {
+ // build an image containing all the specified topics and partitions
+ TopicsDelta buildupTopicsDelta = TopicsImageSnapshotLoadBenchmark.getInitialTopicsDelta(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
+ TopicsImage builtupTopicsImage = buildupTopicsDelta.apply();
+ // build a delta to apply within the benchmark code
+ // that adds a single topic-partition
+ topicsDelta = new TopicsDelta(builtupTopicsImage);
+ Uuid newTopicUuid = Uuid.randomUuid();
+ TopicRecord newTopicRecord = new TopicRecord().setName("newtopic").setTopicId(newTopicUuid);
+ topicsDelta.replay(newTopicRecord);
+ ArrayList<Integer> replicas = TopicsImageSnapshotLoadBenchmark.getReplicas(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker, 0);
+ ArrayList<Integer> isr = new ArrayList<>(replicas);
+ PartitionRecord newPartitionRecord = new PartitionRecord().
+ setPartitionId(0).
+ setTopicId(newTopicUuid).
+ setReplicas(replicas).
+ setIsr(isr).
+ setRemovingReplicas(Collections.emptyList()).
+ setAddingReplicas(Collections.emptyList()).
+ setLeader(0);
+ topicsDelta.replay(newPartitionRecord);
+ System.out.print("(Adding a single topic to metadata having " + totalTopicCount + " total topics) ");
+ }
+
+ @Benchmark
+ public void testTopicsDeltaSingleTopicAdd() {
+ topicsDelta.apply();
+ }
+}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSnapshotLoadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSnapshotLoadBenchmark.java
new file mode 100644
index 00000000000..10961d9d025
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageSnapshotLoadBenchmark.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.metadata;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TopicsImageSnapshotLoadBenchmark {
+ @Param({"12500", "25000", "50000", "100000"})
+ private int totalTopicCount;
+ @Param({"10"})
+ private int partitionsPerTopic;
+ @Param({"3"})
+ private int replicationFactor;
+ @Param({"10000"})
+ private int numReplicasPerBroker;
+
+ private TopicsDelta topicsDelta;
+
+
+ @Setup(Level.Trial)
+ public void setup() {
+ // build a delta to apply within the benchmark code
+ // that consists of all the topics and partitions that would get loaded in a snapshot
+ topicsDelta = getInitialTopicsDelta(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
+ System.out.print("(Loading a snapshot containing " + totalTopicCount + " total topics) ");
+ }
+
+ static TopicsDelta getInitialTopicsDelta(int totalTopicCount, int partitionsPerTopic, int replicationFactor, int numReplicasPerBroker) {
+ int numBrokers = getNumBrokers(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
+ TopicsDelta buildupTopicsDelta = new TopicsDelta(TopicsImage.EMPTY);
+ final AtomicInteger currentLeader = new AtomicInteger(0);
+ IntStream.range(0, totalTopicCount).forEach(topicNumber -> {
+ Uuid topicId = Uuid.randomUuid();
+ buildupTopicsDelta.replay(new TopicRecord().setName("topic" + topicNumber).setTopicId(topicId));
+ IntStream.range(0, partitionsPerTopic).forEach(partitionNumber -> {
+ ArrayList<Integer> replicas = getReplicas(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker, currentLeader.get());
+ ArrayList<Integer> isr = new ArrayList<>(replicas);
+ buildupTopicsDelta.replay(new PartitionRecord().
+ setPartitionId(partitionNumber).
+ setTopicId(topicId).
+ setReplicas(replicas).
+ setIsr(isr).
+ setRemovingReplicas(Collections.emptyList()).
+ setAddingReplicas(Collections.emptyList()).
+ setLeader(currentLeader.get()));
+ currentLeader.set((1 + currentLeader.get()) % numBrokers);
+ });
+ });
+ return buildupTopicsDelta;
+ }
+
+ static ArrayList<Integer> getReplicas(int totalTopicCount, int partitionsPerTopic, int replicationFactor, int numReplicasPerBroker, int currentLeader) {
+ ArrayList<Integer> replicas = new ArrayList<>();
+ int numBrokers = getNumBrokers(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
+ IntStream.range(0, replicationFactor).forEach(replicaNumber ->
+ replicas.add((replicaNumber + currentLeader) % numBrokers));
+ return replicas;
+ }
+
+ static int getNumBrokers(int totalTopicCount, int partitionsPerTopic, int replicationFactor, int numReplicasPerBroker) {
+ int numBrokers = totalTopicCount * partitionsPerTopic * replicationFactor / numReplicasPerBroker;
+ return numBrokers - numBrokers % 3;
+ }
+
+ @Benchmark
+ public void testTopicsDeltaSnapshotLoad() {
+ topicsDelta.apply();
+ }
+}
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageZonalOutageBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageZonalOutageBenchmark.java
new file mode 100644
index 00000000000..5890763397a
--- /dev/null
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/TopicsImageZonalOutageBenchmark.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.metadata;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.image.TopicsDelta;
+import org.apache.kafka.image.TopicsImage;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TopicsImageZonalOutageBenchmark {
+ @Param({"12500", "25000", "50000", "100000"})
+ private int totalTopicCount;
+ @Param({"10"})
+ private int partitionsPerTopic;
+ @Param({"3"})
+ private int replicationFactor;
+ @Param({"10000"})
+ private int numReplicasPerBroker;
+
+ private TopicsDelta topicsDelta;
+
+
+ @Setup(Level.Trial)
+ public void setup() {
+ // build an image containing all of the specified topics and partitions
+ TopicsDelta buildupTopicsDelta = TopicsImageSnapshotLoadBenchmark.getInitialTopicsDelta(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
+ TopicsImage builtupTopicsImage = buildupTopicsDelta.apply();
+ // build a delta to apply within the benchmark code
+ // that perturbs all the topic-partitions for broker 0
+ // (as might happen in a zonal outage, one broker at a time, ultimately across 1/3 of the brokers in the cluster).
+ // It turns out that
+ topicsDelta = new TopicsDelta(builtupTopicsImage);
+ Set<Uuid> perturbedTopics = new HashSet<>();
+ builtupTopicsImage.topicsById().forEach((topicId, topicImage) ->
+ topicImage.partitions().forEach((partitionNumber, partitionRegistration) -> {
+ List<Integer> newIsr = Arrays.stream(partitionRegistration.isr).boxed().filter(n -> n != 0).collect(Collectors.toList());
+ if (newIsr.size() < replicationFactor) {
+ perturbedTopics.add(topicId);
+ topicsDelta.replay(new PartitionRecord().
+ setPartitionId(partitionNumber).
+ setTopicId(topicId).
+ setReplicas(Arrays.stream(partitionRegistration.replicas).boxed().collect(Collectors.toList())).
+ setIsr(newIsr).
+ setRemovingReplicas(Collections.emptyList()).
+ setAddingReplicas(Collections.emptyList()).
+ setLeader(newIsr.get(0)));
+ }
+ })
+ );
+ int numBrokers = TopicsImageSnapshotLoadBenchmark.getNumBrokers(totalTopicCount, partitionsPerTopic, replicationFactor, numReplicasPerBroker);
+ System.out.print("(Perturbing 1 of " + numBrokers + " brokers, or " + perturbedTopics.size() + " topics within metadata having " + totalTopicCount + " total topics) ");
+ }
+
+ @Benchmark
+ public void testTopicsDeltaZonalOutage() {
+ topicsDelta.apply();
+ }
+}
diff --git a/licenses/pcollections-MIT b/licenses/pcollections-MIT
new file mode 100644
index 00000000000..50519c5e432
--- /dev/null
+++ b/licenses/pcollections-MIT
@@ -0,0 +1,24 @@
+MIT License
+
+Copyright 2008-2011, 2014-2020, 2022 Harold Cooper, gil cattaneo, Gleb Frank,
+Günther Grill, Ilya Gorbunov, Jirka Kremser, Jochen Theodorou, Johnny Lim,
+Liam Miller, Mark Perry, Matei Dragu, Mike Klein, Oleg Osipenko, Ran Ari-Gur,
+Shantanu Kumar, and Valeriy Vyrva.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
index d3c5888fa7a..3927e19191d 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
@@ -24,13 +24,13 @@ import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.immutable.ImmutableMap;
import org.apache.kafka.server.common.MetadataVersion;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
@@ -126,29 +126,27 @@ public final class TopicsDelta {
}
public TopicsImage apply() {
- Map<Uuid, TopicImage> newTopicsById = new HashMap<>(image.topicsById().size());
- Map<String, TopicImage> newTopicsByName = new HashMap<>(image.topicsByName().size());
- for (Entry<Uuid, TopicImage> entry : image.topicsById().entrySet()) {
- Uuid id = entry.getKey();
- TopicImage prevTopicImage = entry.getValue();
- TopicDelta delta = changedTopics.get(id);
- if (delta == null) {
- if (!deletedTopicIds.contains(id)) {
- newTopicsById.put(id, prevTopicImage);
- newTopicsByName.put(prevTopicImage.name(), prevTopicImage);
- }
+ ImmutableMap<Uuid, TopicImage> newTopicsById = image.topicsById();
+ ImmutableMap<String, TopicImage> newTopicsByName = image.topicsByName();
+ // apply all the deletes
+ for (Uuid topicId: deletedTopicIds) {
+ // it was deleted, so we have to remove it from the maps
+ TopicImage originalTopicToBeDeleted = image.topicsById().get(topicId);
+ if (originalTopicToBeDeleted == null) {
+ throw new IllegalStateException("Missing topic id " + topicId);
} else {
- TopicImage newTopicImage = delta.apply();
- newTopicsById.put(id, newTopicImage);
- newTopicsByName.put(delta.name(), newTopicImage);
+ newTopicsById = newTopicsById.removed(topicId);
+ newTopicsByName = newTopicsByName.removed(originalTopicToBeDeleted.name());
}
}
- for (Entry<Uuid, TopicDelta> entry : changedTopics.entrySet()) {
- if (!newTopicsById.containsKey(entry.getKey())) {
- TopicImage newTopicImage = entry.getValue().apply();
- newTopicsById.put(newTopicImage.id(), newTopicImage);
- newTopicsByName.put(newTopicImage.name(), newTopicImage);
- }
+ // apply all the updates/additions
+ for (Map.Entry<Uuid, TopicDelta> entry: changedTopics.entrySet()) {
+ Uuid topicId = entry.getKey();
+ TopicImage newTopicToBeAddedOrUpdated = entry.getValue().apply();
+ // put new information into the maps
+ String topicName = newTopicToBeAddedOrUpdated.name();
+ newTopicsById = newTopicsById.updated(topicId, newTopicToBeAddedOrUpdated);
+ newTopicsByName = newTopicsByName.updated(topicName, newTopicToBeAddedOrUpdated);
}
return new TopicsImage(newTopicsById, newTopicsByName);
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
index 5f7db112f0a..569264b1c4c 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsImage.java
@@ -21,41 +21,45 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.server.immutable.ImmutableMap;
import org.apache.kafka.server.util.TranslatedValueMapView;
-import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
-
/**
* Represents the topics in the metadata image.
*
* This class is thread-safe.
*/
public final class TopicsImage {
- public static final TopicsImage EMPTY =
- new TopicsImage(Collections.emptyMap(), Collections.emptyMap());
+ public static final TopicsImage EMPTY = new TopicsImage(ImmutableMap.empty(), ImmutableMap.empty());
+
+ private final ImmutableMap<Uuid, TopicImage> topicsById;
+ private final ImmutableMap<String, TopicImage> topicsByName;
- private final Map<Uuid, TopicImage> topicsById;
- private final Map<String, TopicImage> topicsByName;
+ public TopicsImage(ImmutableMap<Uuid, TopicImage> topicsById,
+ ImmutableMap<String, TopicImage> topicsByName) {
+ this.topicsById = topicsById;
+ this.topicsByName = topicsByName;
+ }
- public TopicsImage(Map<Uuid, TopicImage> topicsById,
- Map<String, TopicImage> topicsByName) {
- this.topicsById = Collections.unmodifiableMap(topicsById);
- this.topicsByName = Collections.unmodifiableMap(topicsByName);
+ public TopicsImage including(TopicImage topic) {
+ return new TopicsImage(
+ this.topicsById.updated(topic.id(), topic),
+ this.topicsByName.updated(topic.name(), topic));
}
public boolean isEmpty() {
return topicsById.isEmpty() && topicsByName.isEmpty();
}
- public Map<Uuid, TopicImage> topicsById() {
+ public ImmutableMap<Uuid, TopicImage> topicsById() {
return topicsById;
}
- public Map<String, TopicImage> topicsByName() {
+ public ImmutableMap<String, TopicImage> topicsByName() {
return topicsByName;
}
@@ -74,8 +78,8 @@ public final class TopicsImage {
}
public void write(ImageWriter writer, ImageWriterOptions options) {
- for (TopicImage topicImage : topicsById.values()) {
- topicImage.write(writer, options);
+ for (Map.Entry<Uuid, TopicImage> entry : topicsById.entrySet()) {
+ entry.getValue().write(writer, options);
}
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
index 7781bbdce9f..8be8548433a 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetricsTestUtils.java
@@ -99,12 +99,10 @@ public class ControllerMetricsTestUtils {
public static TopicsImage fakeTopicsImage(
TopicImage... topics
) {
- Map<Uuid, TopicImage> topicsById = new HashMap<>();
- Map<String, TopicImage> topicsByName = new HashMap<>();
+ TopicsImage image = TopicsImage.EMPTY;
for (TopicImage topic : topics) {
- topicsById.put(topic.id(), topic);
- topicsByName.put(topic.name(), topic);
+ image = image.including(topic);
}
- return new TopicsImage(topicsById, topicsByName);
+ return image;
}
}
diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
index 8268ec86091..11af3488bf5 100644
--- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.immutable.ImmutableMap;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -74,18 +75,18 @@ public class TopicsImageTest {
return new TopicImage(name, id, partitionMap);
}
- private static Map<Uuid, TopicImage> newTopicsByIdMap(Collection<TopicImage> topics) {
- Map<Uuid, TopicImage> map = new HashMap<>();
+ private static ImmutableMap<Uuid, TopicImage> newTopicsByIdMap(Collection<TopicImage> topics) {
+ ImmutableMap<Uuid, TopicImage> map = TopicsImage.EMPTY.topicsById();
for (TopicImage topic : topics) {
- map.put(topic.id(), topic);
+ map = map.updated(topic.id(), topic);
}
return map;
}
- private static Map<String, TopicImage> newTopicsByNameMap(Collection<TopicImage> topics) {
- Map<String, TopicImage> map = new HashMap<>();
+ private static ImmutableMap<String, TopicImage> newTopicsByNameMap(Collection<TopicImage> topics) {
+ ImmutableMap<String, TopicImage> map = TopicsImage.EMPTY.topicsByName();
for (TopicImage topic : topics) {
- map.put(topic.name(), topic);
+ map = map.updated(topic.name(), topic);
}
return map;
}
diff --git a/server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java b/server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java
new file mode 100644
index 00000000000..ec9790b3ff3
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import org.apache.kafka.server.immutable.pcollections.PCollectionsImmutableMap;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {
+ /**
+ * @return a wrapped hash-based persistent map that is empty
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+ static <K, V> ImmutableMap<K, V> empty() {
+ return PCollectionsImmutableMap.empty();
+ }
+
+ /**
+ * @param key the key
+ * @param value the value
+ * @return a wrapped hash-based persistent map that has a single mapping
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+ static <K, V> ImmutableMap<K, V> singleton(K key, V value) {
+ return PCollectionsImmutableMap.singleton(key, value);
+ }
+
+ /**
+ * @param key the key
+ * @param value the value
+ * @return a wrapped persistent map that differs from this one in that the given mapping is added (if necessary)
+ */
+ ImmutableMap<K, V> updated(K key, V value);
+
+ /**
+ * @param key the key
+ * @return a wrapped persistent map that differs from this one in that the given mapping is removed (if necessary)
+ */
+ ImmutableMap<K, V> removed(K key);
+}
diff --git a/server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableSet.java b/server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableSet.java
new file mode 100644
index 00000000000..6f2cfd015a9
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableSet.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import org.apache.kafka.server.immutable.pcollections.PCollectionsImmutableSet;
+
+import java.util.Set;
+
+/**
+ * A persistent Hash-based Set wrapper
+ * java.util.Set methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <E> the element type
+ */
+public interface ImmutableSet<E> extends Set<E> {
+
+ /**
+ * @return a wrapped hash-based persistent set that is empty
+ * @param <E> the element type
+ */
+ static <E> ImmutableSet<E> empty() {
+ return PCollectionsImmutableSet.empty();
+ }
+
+ /**
+ * @param e the element
+ * @return a wrapped hash-based persistent set that has a single element
+ * @param <E> the element type
+ */
+ static <E> ImmutableSet<E> singleton(E e) {
+ return PCollectionsImmutableSet.singleton(e);
+ }
+
+ /**
+ * @param e the element
+ * @return a wrapped persistent set that differs from this one in that the given element is added (if necessary)
+ */
+ ImmutableSet<E> added(E e);
+
+ /**
+ * @param e the element
+ * @return a wrapped persistent set that differs from this one in that the given element is added (if necessary)
+ */
+ ImmutableSet<E> removed(E e);
+}
diff --git a/server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMap.java b/server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMap.java
new file mode 100644
index 00000000000..d808f0db9c3
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMap.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable.pcollections;
+
+import org.apache.kafka.server.immutable.ImmutableMap;
+import org.pcollections.HashPMap;
+import org.pcollections.HashTreePMap;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+@SuppressWarnings("deprecation")
+public class PCollectionsImmutableMap<K, V> implements ImmutableMap<K, V> {
+
+ private final HashPMap<K, V> underlying;
+
+ /**
+ * @return a wrapped hash-based persistent map that is empty
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+ public static <K, V> PCollectionsImmutableMap<K, V> empty() {
+ return new PCollectionsImmutableMap<>(HashTreePMap.empty());
+ }
+
+ /**
+ * @param key the key
+ * @param value the value
+ * @return a wrapped hash-based persistent map that has a single mapping
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+ public static <K, V> PCollectionsImmutableMap<K, V> singleton(K key, V value) {
+ return new PCollectionsImmutableMap<>(HashTreePMap.singleton(key, value));
+ }
+
+ public PCollectionsImmutableMap(HashPMap<K, V> map) {
+ this.underlying = Objects.requireNonNull(map);
+ }
+
+ @Override
+ public ImmutableMap<K, V> updated(K key, V value) {
+ return new PCollectionsImmutableMap<>(underlying().plus(key, value));
+ }
+
+ @Override
+ public ImmutableMap<K, V> removed(K key) {
+ return new PCollectionsImmutableMap<>(underlying().minus(key));
+ }
+
+ @Override
+ public int size() {
+ return underlying().size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return underlying().isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return underlying().containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return underlying().containsValue(value);
+ }
+
+ @Override
+ public V get(Object key) {
+ return underlying().get(key);
+ }
+
+ @Override
+ public V put(K key, V value) {
+ // will throw UnsupportedOperationException; delegate anyway for testability
+ return underlying().put(key, value);
+ }
+
+ @Override
+ public V remove(Object key) {
+ // will throw UnsupportedOperationException; delegate anyway for testability
+ return underlying().remove(key);
+ }
+
+ @Override
+ public void putAll(Map<? extends K, ? extends V> m) {
+ // will throw UnsupportedOperationException; delegate anyway for testability
+ underlying().putAll(m);
+ }
+
+ @Override
+ public void clear() {
+ // will throw UnsupportedOperationException; delegate anyway for testability
+ underlying().clear();
+ }
+
+ @Override
+ public Set<K> keySet() {
+ return underlying().keySet();
+ }
+
+ @Override
+ public Collection<V> values() {
+ return underlying().values();
+ }
+
+ @Override
+ public Set<Entry<K, V>> entrySet() {
+ return underlying().entrySet();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ PCollectionsImmutableMap<?, ?> that = (PCollectionsImmutableMap<?, ?>) o;
+ return underlying().equals(that.underlying());
+ }
+
+ @Override
+ public int hashCode() {
+ return underlying().hashCode();
+ }
+
+ @Override
+ public V getOrDefault(Object key, V defaultValue) {
+ return underlying().getOrDefault(key, defaultValue);
+ }
+
+ @Override
+ public void forEach(BiConsumer<? super K, ? super V> action) {
+ underlying().forEach(action);
+ }
+
+ @Override
+ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
+ // will throw UnsupportedOperationException; delegate anyway for testability
+ underlying().replaceAll(function);
+ }
+
+ @Override
+ public V putIfAbsent(K key, V value) {
+ // will throw UnsupportedOperationException; delegate anyway for testability
+ return underlying().putIfAbsent(key, value);
+ }
+
+ @Override
+ public boolean remove(Object key, Object value) {
+ // will throw UnsupportedOperationException; delegate anyway for testability
+ return underlying().remove(key, value);
+ }
+
+ @Override
+ public boolean replace(K key, V oldValue, V newValue) {
+ // will throw UnsupportedOperationException; delegate anyway for testability
+ return underlying().replace(key, oldValue, newValue);
+ }
+
+ @Override
+ public V replace(K key, V value) {
+ // will throw UnsupportedOperationException; delegate anyway for testability
+ return underlying().replace(key, value);
+ }
+
+ @Override
+ public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
+ // will throw UnsupportedOperationException; delegate anyway for testability
+ return underlying().computeIfAbsent(key, mappingFunction);
+ }
+
+ @Override
+ public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+ // will throw UnsupportedOperationException; delegate anyway for testability
+ return underlying().computeIfPresent(key, remappingFunction);
+ }
+
+ @Override
+ public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+ // will throw UnsupportedOperationException; delegate anyway for testability
+ return underlying().compute(key, remappingFunction);
+ }
+
+ @Override
+ public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
+ // will throw UnsupportedOperationException; delegate anyway for testability
+ return underlying().merge(key, value, remappingFunction);
+ }
+
+ @Override
+ public String toString() {
+ return "PCollectionsImmutableMap{" +
+ "underlying=" + underlying() +
+ '}';
+ }
+
+ // package-private for testing
+ HashPMap<K, V> underlying() {
+ return underlying;
+ }
+}
diff --git a/server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSet.java b/server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSet.java
new file mode 100644
index 00000000000..8a50326ef1f
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSet.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable.pcollections;
+
+import org.apache.kafka.server.immutable.ImmutableSet;
+import org.pcollections.HashTreePSet;
+import org.pcollections.MapPSet;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Spliterator;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+@SuppressWarnings("deprecation")
+public class PCollectionsImmutableSet<E> implements ImmutableSet<E> {
+ private final MapPSet<E> underlying;
+
+ /**
+ * @return a wrapped hash-based persistent set that is empty
+ * @param <E> the element type
+ */
+ public static <E> PCollectionsImmutableSet<E> empty() {
+ return new PCollectionsImmutableSet<>(HashTreePSet.empty());
+ }
+
+ /**
+ * @param e the element
+ * @return a wrapped hash-based persistent set that has a single element
+ * @param <E> the element type
+ */
+ public static <E> PCollectionsImmutableSet<E> singleton(E e) {
+ return new PCollectionsImmutableSet<>(HashTreePSet.singleton(e));
+ }
+
+ public PCollectionsImmutableSet(MapPSet<E> set) {
+ this.underlying = Objects.requireNonNull(set);
+ }
+
+ @Override
+ public ImmutableSet<E> added(E e) {
+ return new PCollectionsImmutableSet<>(underlying().plus(e));
+ }
+
+ @Override
+ public ImmutableSet<E> removed(E e) {
+ return new PCollectionsImmutableSet<>(underlying().minus(e));
+ }
+
+ @Override
+ public int size() {
+ return underlying().size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return underlying().isEmpty();
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return underlying().contains(o);
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ return underlying.iterator();
+ }
+
+ @Override
+ public void forEach(Consumer<? super E> action) {
+ underlying().forEach(action);
+ }
+
+ @Override
+ public Object[] toArray() {
+ return underlying().toArray();
+ }
+
+ @Override
+ public <T> T[] toArray(T[] a) {
+ return underlying().toArray(a);
+ }
+
+ @Override
+ public boolean add(E e) {
+ // will throw UnsupportedOperationException; delegate anyway for testability
+ return underlying().add(e);
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ // will throw UnsupportedOperationException; delegate anyway for testability
+ return underlying().remove(o);
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ return underlying.containsAll(c);
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends E> c) {
+ // will throw UnsupportedOperationException; delegate anyway for testability
+ return underlying().addAll(c);
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ // will throw UnsupportedOperationException; delegate anyway for testability
+ return underlying().retainAll(c);
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ // will throw UnsupportedOperationException; delegate anyway for testability
+ return underlying().removeAll(c);
+ }
+
+ @Override
+ public boolean removeIf(Predicate<? super E> filter) {
+ // will throw UnsupportedOperationException; delegate anyway for testability
+ return underlying().removeIf(filter);
+ }
+
+ @Override
+ public void clear() {
+ // will throw UnsupportedOperationException; delegate anyway for testability
+ underlying().clear();
+ }
+
+ @Override
+ public Spliterator<E> spliterator() {
+ return underlying().spliterator();
+ }
+
+ @Override
+ public Stream<E> stream() {
+ return underlying().stream();
+ }
+
+ @Override
+ public Stream<E> parallelStream() {
+ return underlying().parallelStream();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ PCollectionsImmutableSet<?> that = (PCollectionsImmutableSet<?>) o;
+ return Objects.equals(underlying(), that.underlying());
+ }
+
+ @Override
+ public int hashCode() {
+ return underlying().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "PCollectionsImmutableSet{" +
+ "underlying=" + underlying() +
+ '}';
+ }
+
+ // package-private for testing
+ MapPSet<E> underlying() {
+ return this.underlying;
+ }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/server/immutable/DelegationChecker.java b/server-common/src/test/java/org/apache/kafka/server/immutable/DelegationChecker.java
new file mode 100644
index 00000000000..e657e9b11c4
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/immutable/DelegationChecker.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import org.mockito.Mockito;
+
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+/**
+ * Facilitate testing of wrapper class delegation.
+ *
+ * We require the following things to test delegation:
+ *
+ * 1. A mock object to which the wrapper is expected to delegate method invocations
+ * 2. A way to define how the mock is expected to behave when its method is invoked
+ * 3. A way to define how to invoke the method on the wrapper
+ * 4. A way to test that the method on the mock is invoked correctly when the wrapper method is invoked
+ * 5. A way to test that any return value from the wrapper method is correct
+
+ * @param <D> delegate type
+ * @param <W> wrapper type
+ * @param <T> delegating method return type, if any
+ */
+public abstract class DelegationChecker<D, W, T> {
+ private final D mock;
+ private final W wrapper;
+ private Consumer<D> mockConsumer;
+ private Function<D, T> mockConfigurationFunction;
+ private T mockFunctionReturnValue;
+ private Consumer<W> wrapperConsumer;
+ private Function<W, T> wrapperFunctionApplier;
+ private Function<T, ?> mockFunctionReturnValueTransformation;
+ private boolean expectWrapperToWrapMockFunctionReturnValue;
+ private boolean persistentCollectionMethodInvokedCorrectly = false;
+
+ /**
+ * @param mock mock for the underlying delegate
+ * @param wrapperCreator how to create a wrapper for the mock
+ */
+ protected DelegationChecker(D mock, Function<D, W> wrapperCreator) {
+ this.mock = Objects.requireNonNull(mock);
+ this.wrapper = Objects.requireNonNull(wrapperCreator).apply(mock);
+ }
+
+ /**
+ * @param wrapper the wrapper
+ * @return the underlying delegate for the given wrapper
+ */
+ public abstract D unwrap(W wrapper);
+
+ public DelegationChecker<D, W, T> defineMockConfigurationForVoidMethodInvocation(Consumer<D> mockConsumer) {
+ this.mockConsumer = Objects.requireNonNull(mockConsumer);
+ return this;
+ }
+
+ public DelegationChecker<D, W, T> defineMockConfigurationForFunctionInvocation(Function<D, T> mockConfigurationFunction, T mockFunctionReturnValue) {
+ this.mockConfigurationFunction = Objects.requireNonNull(mockConfigurationFunction);
+ this.mockFunctionReturnValue = mockFunctionReturnValue;
+ return this;
+ }
+
+ public DelegationChecker<D, W, T> defineWrapperVoidMethodInvocation(Consumer<W> wrapperConsumer) {
+ this.wrapperConsumer = Objects.requireNonNull(wrapperConsumer);
+ return this;
+ }
+
+ public <R> DelegationChecker<D, W, T> defineWrapperFunctionInvocationAndMockReturnValueTransformation(
+ Function<W, T> wrapperFunctionApplier,
+ Function<T, R> expectedFunctionReturnValueTransformation) {
+ this.wrapperFunctionApplier = Objects.requireNonNull(wrapperFunctionApplier);
+ this.mockFunctionReturnValueTransformation = Objects.requireNonNull(expectedFunctionReturnValueTransformation);
+ return this;
+ }
+
+ public DelegationChecker<D, W, T> expectWrapperToWrapMockFunctionReturnValue() {
+ this.expectWrapperToWrapMockFunctionReturnValue = true;
+ return this;
+ }
+
+ public void doVoidMethodDelegationCheck() {
+ if (mockConsumer == null || wrapperConsumer == null ||
+ mockConfigurationFunction != null || wrapperFunctionApplier != null ||
+ mockFunctionReturnValue != null || mockFunctionReturnValueTransformation != null) {
+ throwExceptionForIllegalTestSetup();
+ }
+ // configure the mock to behave as desired
+ mockConsumer.accept(Mockito.doAnswer(invocation -> {
+ persistentCollectionMethodInvokedCorrectly = true;
+ return null;
+ }).when(mock));
+ // invoke the wrapper, which should invoke the mock as desired
+ wrapperConsumer.accept(wrapper);
+ // assert that the expected delegation to the mock actually occurred
+ assertTrue(persistentCollectionMethodInvokedCorrectly);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void doFunctionDelegationCheck() {
+ if (mockConfigurationFunction == null || wrapperFunctionApplier == null ||
+ mockFunctionReturnValueTransformation == null ||
+ mockConsumer != null || wrapperConsumer != null) {
+ throwExceptionForIllegalTestSetup();
+ }
+ // configure the mock to behave as desired
+ when(mockConfigurationFunction.apply(mock)).thenAnswer(invocation -> {
+ persistentCollectionMethodInvokedCorrectly = true;
+ return mockFunctionReturnValue;
+ });
+ // invoke the wrapper, which should invoke the mock as desired
+ T wrapperReturnValue = wrapperFunctionApplier.apply(wrapper);
+ // assert that the expected delegation to the mock actually occurred, including any return value transformation
+ assertTrue(persistentCollectionMethodInvokedCorrectly);
+ Object transformedMockFunctionReturnValue = mockFunctionReturnValueTransformation.apply(mockFunctionReturnValue);
+ if (this.expectWrapperToWrapMockFunctionReturnValue) {
+ assertEquals(transformedMockFunctionReturnValue, unwrap((W) wrapperReturnValue));
+ } else {
+ assertEquals(transformedMockFunctionReturnValue, wrapperReturnValue);
+ }
+ }
+
+ private static void throwExceptionForIllegalTestSetup() {
+ throw new IllegalStateException(
+ "test setup error: must define both mock and wrapper consumers or both mock and wrapper functions");
+ }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java b/server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java
new file mode 100644
index 00000000000..ab32b32be72
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMapTest.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable.pcollections;
+
+import org.apache.kafka.server.immutable.DelegationChecker;
+import org.apache.kafka.server.immutable.ImmutableMap;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.pcollections.HashPMap;
+import org.pcollections.HashTreePMap;
+
+import java.util.Collections;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static java.util.function.Function.identity;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+
+@SuppressWarnings({"unchecked", "deprecation"})
+public class PCollectionsImmutableMapTest {
+ private static final HashPMap<Object, Object> SINGLETON_MAP = HashTreePMap.singleton(new Object(), new Object());
+
+ private static final class PCollectionsHashMapWrapperDelegationChecker<R> extends DelegationChecker<HashPMap<Object, Object>, PCollectionsImmutableMap<Object, Object>, R> {
+ public PCollectionsHashMapWrapperDelegationChecker() {
+ super(mock(HashPMap.class), PCollectionsImmutableMap::new);
+ }
+
+ public HashPMap<Object, Object> unwrap(PCollectionsImmutableMap<Object, Object> wrapper) {
+ return wrapper.underlying();
+ }
+ }
+
+ @Test
+ public void testEmptyMap() {
+ Assertions.assertEquals(HashTreePMap.empty(), ((PCollectionsImmutableMap<?, ?>) ImmutableMap.empty()).underlying());
+ }
+
+ @Test
+ public void testSingletonMap() {
+ Assertions.assertEquals(HashTreePMap.singleton(1, 2), ((PCollectionsImmutableMap<?, ?>) ImmutableMap.singleton(1, 2)).underlying());
+ }
+
+ @Test
+ public void testUnderlying() {
+ assertSame(SINGLETON_MAP, new PCollectionsImmutableMap<>(SINGLETON_MAP).underlying());
+ }
+
+ @Test
+ public void testDelegationOfAfterAdding() {
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.plus(eq(this), eq(this)), SINGLETON_MAP)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.updated(this, this), identity())
+ .expectWrapperToWrapMockFunctionReturnValue()
+ .doFunctionDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfAfterRemoving() {
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.minus(eq(this)), SINGLETON_MAP)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removed(this), identity())
+ .expectWrapperToWrapMockFunctionReturnValue()
+ .doFunctionDelegationCheck();
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {1, 2})
+ public void testDelegationOfSize(int mockFunctionReturnValue) {
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(HashPMap::size, mockFunctionReturnValue)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::size, identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testDelegationOfIsEmpty(boolean mockFunctionReturnValue) {
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(HashPMap::isEmpty, mockFunctionReturnValue)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::isEmpty, identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testDelegationOfContainsKey(boolean mockFunctionReturnValue) {
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.containsKey(eq(this)), mockFunctionReturnValue)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.containsKey(this), identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testDelegationOfContainsValue(boolean mockFunctionReturnValue) {
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.containsValue(eq(this)), mockFunctionReturnValue)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.containsValue(this), identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfGet() {
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.get(eq(this)), new Object())
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.get(this), identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfPut() {
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.put(eq(this), eq(this)), this)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.put(this, this), identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfRemoveByKey() {
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.remove(eq(this)), this)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.remove(this), identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfPutAll() {
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForVoidMethodInvocation(mock -> mock.putAll(eq(Collections.emptyMap())))
+ .defineWrapperVoidMethodInvocation(wrapper -> wrapper.putAll(Collections.emptyMap()))
+ .doVoidMethodDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfClear() {
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForVoidMethodInvocation(HashPMap::clear)
+ .defineWrapperVoidMethodInvocation(PCollectionsImmutableMap::clear)
+ .doVoidMethodDelegationCheck();
+ }
+
+
+ @Test
+ public void testDelegationOfKeySet() {
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(HashPMap::keySet, Collections.emptySet())
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::keySet, identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfValues() {
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(HashPMap::values, Collections.emptySet())
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::values, identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfEntrySet() {
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(HashPMap::entrySet, Collections.emptySet())
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::entrySet, identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @Test
+ public void testEquals() {
+ final HashPMap<Object, Object> mock = mock(HashPMap.class);
+ assertEquals(new PCollectionsImmutableMap<>(mock), new PCollectionsImmutableMap<>(mock));
+ final HashPMap<Object, Object> someOtherMock = mock(HashPMap.class);
+ assertNotEquals(new PCollectionsImmutableMap<>(mock), new PCollectionsImmutableMap<>(someOtherMock));
+ }
+
+ @Test
+ public void testHashCode() {
+ final HashPMap<Object, Object> mock = mock(HashPMap.class);
+ assertEquals(mock.hashCode(), new PCollectionsImmutableMap<>(mock).hashCode());
+ final HashPMap<Object, Object> someOtherMock = mock(HashPMap.class);
+ assertNotEquals(mock.hashCode(), new PCollectionsImmutableMap<>(someOtherMock).hashCode());
+ }
+
+ @Test
+ public void testDelegationOfGetOrDefault() {
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.getOrDefault(eq(this), eq(this)), this)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.getOrDefault(this, this), identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfForEach() {
+ final BiConsumer<Object, Object> mockBiConsumer = mock(BiConsumer.class);
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForVoidMethodInvocation(mock -> mock.forEach(eq(mockBiConsumer)))
+ .defineWrapperVoidMethodInvocation(wrapper -> wrapper.forEach(mockBiConsumer))
+ .doVoidMethodDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfReplaceAll() {
+ final BiFunction<Object, Object, Object> mockBiFunction = mock(BiFunction.class);
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForVoidMethodInvocation(mock -> mock.replaceAll(eq(mockBiFunction)))
+ .defineWrapperVoidMethodInvocation(wrapper -> wrapper.replaceAll(mockBiFunction))
+ .doVoidMethodDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfPutIfAbsent() {
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.putIfAbsent(eq(this), eq(this)), this)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.putIfAbsent(this, this), identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testDelegationOfRemoveByKeyAndValue(boolean mockFunctionReturnValue) {
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.remove(eq(this), eq(this)), mockFunctionReturnValue)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.remove(this, this), identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testDelegationOfReplaceWhenMappedToSpecificValue(boolean mockFunctionReturnValue) {
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.replace(eq(this), eq(this), eq(this)), mockFunctionReturnValue)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.replace(this, this, this), identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfReplaceWhenMappedToAnyValue() {
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.replace(eq(this), eq(this)), this)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.replace(this, this), identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfComputeIfAbsent() {
+ final Function<Object, Object> mockFunction = mock(Function.class);
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForVoidMethodInvocation(mock -> mock.computeIfAbsent(eq(this), eq(mockFunction)))
+ .defineWrapperVoidMethodInvocation(wrapper -> wrapper.computeIfAbsent(this, mockFunction))
+ .doVoidMethodDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfComputeIfPresent() {
+ final BiFunction<Object, Object, Object> mockBiFunction = mock(BiFunction.class);
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForVoidMethodInvocation(mock -> mock.computeIfPresent(eq(this), eq(mockBiFunction)))
+ .defineWrapperVoidMethodInvocation(wrapper -> wrapper.computeIfPresent(this, mockBiFunction))
+ .doVoidMethodDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfCompute() {
+ final BiFunction<Object, Object, Object> mockBiFunction = mock(BiFunction.class);
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForVoidMethodInvocation(mock -> mock.compute(eq(this), eq(mockBiFunction)))
+ .defineWrapperVoidMethodInvocation(wrapper -> wrapper.compute(this, mockBiFunction))
+ .doVoidMethodDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfMerge() {
+ final BiFunction<Object, Object, Object> mockBiFunction = mock(BiFunction.class);
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForVoidMethodInvocation(mock -> mock.merge(eq(this), eq(this), eq(mockBiFunction)))
+ .defineWrapperVoidMethodInvocation(wrapper -> wrapper.merge(this, this, mockBiFunction))
+ .doVoidMethodDelegationCheck();
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"a", "b"})
+ public void testDelegationOfToString(String mockFunctionReturnValue) {
+ new PCollectionsHashMapWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(HashPMap::toString, mockFunctionReturnValue)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableMap::toString,
+ text -> "PCollectionsImmutableMap{underlying=" + text + "}")
+ .doFunctionDelegationCheck();
+ }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSetTest.java b/server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSetTest.java
new file mode 100644
index 00000000000..457488404f9
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableSetTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable.pcollections;
+
+import org.apache.kafka.server.immutable.DelegationChecker;
+import org.apache.kafka.server.immutable.ImmutableSet;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.pcollections.HashTreePSet;
+import org.pcollections.MapPSet;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Spliterator;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+import static java.util.function.Function.identity;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+
+@SuppressWarnings({"unchecked", "deprecation"})
+public class PCollectionsImmutableSetTest {
+
+ private static final MapPSet<Object> SINGLETON_SET = HashTreePSet.singleton(new Object());
+
+ private static final class PCollectionsHashSetWrapperDelegationChecker<R> extends DelegationChecker<MapPSet<Object>, PCollectionsImmutableSet<Object>, R> {
+ public PCollectionsHashSetWrapperDelegationChecker() {
+ super(mock(MapPSet.class), PCollectionsImmutableSet::new);
+ }
+
+ public MapPSet<Object> unwrap(PCollectionsImmutableSet<Object> wrapper) {
+ return wrapper.underlying();
+ }
+ }
+
+ @Test
+ public void testEmptySet() {
+ Assertions.assertEquals(HashTreePSet.empty(), ((PCollectionsImmutableSet<?>) ImmutableSet.empty()).underlying());
+ }
+
+ @Test
+ public void testSingletonSet() {
+ Assertions.assertEquals(HashTreePSet.singleton(1), ((PCollectionsImmutableSet<?>) ImmutableSet.singleton(1)).underlying());
+ }
+
+ @Test
+ public void testUnderlying() {
+ assertSame(SINGLETON_SET, new PCollectionsImmutableSet<>(SINGLETON_SET).underlying());
+ }
+
+ @Test
+ public void testDelegationOfAfterAdding() {
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.plus(eq(this)), SINGLETON_SET)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.added(this), identity())
+ .expectWrapperToWrapMockFunctionReturnValue()
+ .doFunctionDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfAfterRemoving() {
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.minus(eq(this)), SINGLETON_SET)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removed(this), identity())
+ .expectWrapperToWrapMockFunctionReturnValue()
+ .doFunctionDelegationCheck();
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {1, 2})
+ public void testDelegationOfSize(int mockFunctionReturnValue) {
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(MapPSet::size, mockFunctionReturnValue)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::size, identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testDelegationOfIsEmpty(boolean mockFunctionReturnValue) {
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(MapPSet::isEmpty, mockFunctionReturnValue)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::isEmpty, identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testDelegationOfContains(boolean mockFunctionReturnValue) {
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.contains(eq(this)), mockFunctionReturnValue)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.contains(this), identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfIterator() {
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(MapPSet::iterator, mock(Iterator.class))
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::iterator, identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfForEach() {
+ final Consumer<Object> mockConsumer = mock(Consumer.class);
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForVoidMethodInvocation(mock -> mock.forEach(eq(mockConsumer)))
+ .defineWrapperVoidMethodInvocation(wrapper -> wrapper.forEach(mockConsumer))
+ .doVoidMethodDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfToArray() {
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(MapPSet::toArray, new Object[0])
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::toArray, identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfToArrayIntoGivenDestination() {
+ Object[] destinationArray = new Object[0];
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.toArray(eq(destinationArray)), new Object[0])
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.toArray(destinationArray), identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testDelegationOfAdd(boolean mockFunctionReturnValue) {
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.add(eq(this)), mockFunctionReturnValue)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.add(this), identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testDelegationOfRemove(boolean mockFunctionReturnValue) {
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.remove(eq(this)), mockFunctionReturnValue)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.remove(this), identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testDelegationOfContainsAll(boolean mockFunctionReturnValue) {
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.containsAll(eq(Collections.emptyList())), mockFunctionReturnValue)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.containsAll(Collections.emptyList()), identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testDelegationOfAddAll(boolean mockFunctionReturnValue) {
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.addAll(eq(Collections.emptyList())), mockFunctionReturnValue)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.addAll(Collections.emptyList()), identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testDelegationOfRetainAll(boolean mockFunctionReturnValue) {
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.retainAll(eq(Collections.emptyList())), mockFunctionReturnValue)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.retainAll(Collections.emptyList()), identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testDelegationOfRemoveAll(boolean mockFunctionReturnValue) {
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.removeAll(eq(Collections.emptyList())), mockFunctionReturnValue)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removeAll(Collections.emptyList()), identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testDelegationOfRemoveIf(boolean mockFunctionReturnValue) {
+ final Predicate<Object> mockPredicate = mock(Predicate.class);
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(mock -> mock.removeIf(eq(mockPredicate)), mockFunctionReturnValue)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removeIf(mockPredicate), identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfClear() {
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForVoidMethodInvocation(MapPSet::clear)
+ .defineWrapperVoidMethodInvocation(PCollectionsImmutableSet::clear)
+ .doVoidMethodDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfSpliterator() {
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(MapPSet::spliterator, mock(Spliterator.class))
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::spliterator, identity())
+ .doFunctionDelegationCheck();
+ }
+
+
+ @Test
+ public void testDelegationOfStream() {
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(MapPSet::stream, mock(Stream.class))
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::stream, identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @Test
+ public void testDelegationOfParallelStream() {
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(MapPSet::parallelStream, mock(Stream.class))
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::parallelStream, identity())
+ .doFunctionDelegationCheck();
+ }
+
+ @Test
+ public void testEquals() {
+ final MapPSet<Object> mock = mock(MapPSet.class);
+ assertEquals(new PCollectionsImmutableSet<>(mock), new PCollectionsImmutableSet<>(mock));
+ final MapPSet<Object> someOtherMock = mock(MapPSet.class);
+ assertNotEquals(new PCollectionsImmutableSet<>(mock), new PCollectionsImmutableSet<>(someOtherMock));
+ }
+
+ @Test
+ public void testHashCode() {
+ final MapPSet<Object> mock = mock(MapPSet.class);
+ assertEquals(mock.hashCode(), new PCollectionsImmutableSet<>(mock).hashCode());
+ final MapPSet<Object> someOtherMock = mock(MapPSet.class);
+ assertNotEquals(mock.hashCode(), new PCollectionsImmutableSet<>(someOtherMock).hashCode());
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"a", "b"})
+ public void testDelegationOfToString(String mockFunctionReturnValue) {
+ new PCollectionsHashSetWrapperDelegationChecker<>()
+ .defineMockConfigurationForFunctionInvocation(MapPSet::toString, mockFunctionReturnValue)
+ .defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableSet::toString,
+ text -> "PCollectionsImmutableSet{underlying=" + text + "}")
+ .doFunctionDelegationCheck();
+ }
+}