You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "rondagostino (via GitHub)" <gi...@apache.org> on 2023/02/20 20:04:14 UTC

[GitHub] [kafka] rondagostino opened a new pull request, #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

rondagostino opened a new pull request, #13280:
URL: https://github.com/apache/kafka/pull/13280

   …topic counts
   
   Performance of KRaft metadata image changes is currently O(<# of topics in cluster>). This means the amount of time it takes to create just a single topic scales linearly with the number of topics in the entire cluster. This impact both controllers and brokers because both use the metadata image to represent the KRaft metadata log. The performance of these changes should scale with the number of topics being changed – creating a single topic should perform similarly regardless of the number of topics in the cluster.
   
   This patch introduces a dependency on the [Paguro](https://github.com/GlenKPeterson/Paguro/) library for immutable/persistent collection support in Java and leverages persistent data structures to avoid copying the entire TopicsImage upon every change.  We choose this library because it is relatively small and [well-integrated](https://github.com/GlenKPeterson/Paguro/blob/main/inheritanceHierarchy.pdf) with the existing Java Collections class hierarchy (the latter property especially helps to minimize the changes required to introduce the library into the existing code base).  The patch also adds the following JMH benchmarks demonstrating the resulting performance changes:
   
   - `TopicsImageSingleRecordChangeBenchmark` tracks how long it takes to create a new topic.  This is the benchmark that clearly identifies the O(N) behavior in the existing code and that most dramatically illustrates a performance improvement.
   
   As shown below, the existing code takes several orders of magnitude longer to make a single change than the new code.  The existing code, with 12,500 topics, took 1.4 milliseconds on my laptop and grows more or less linearly as the number of topics grows.  The new code took a constant amount of time (~250 nanoseconds) regardless of the number of topics in the cluster.
   
   The reason for the improvement is because it is inexpensive to add, update, or delete an entry in an immutable, persistent map to create a new persistent map.  The new map shares the vast amount of the old map; only the root node and any nodes along the path to the node that must change are swapped out, and when the reference to the old map is released the swapped-out nodes can be garbage-collected.
   
   **Current Code, unpatched**
   Total Topic Count | nanoseconds/op | error
   -- | -- | --
   12,500 | 1,410,901 | 153,461
   25,000 | 3,570,451 | 221,992
   50,000 | 14,143,125 | 1,123,616
   100,000 | 31,126,930 | 4,243,625
   
   **Updated Code**
   Total Topic Count | nanoseconds/op | error
   -- | -- | --
   12,500 | 258 | 13
   25,000 | 265 | 8
   50,000 | 273 | 5
   100,000 | 222 | 4
   
   
   - `TopicsImageZonalOutageBenchmark` simulates a zonal outage where each broker in the zone will lose its session – in this benchmark we assume the controller deals with them one by one rather than demoting 1/3 of the cluster all at once.  Since the number of topics per broker does not change very much, we expect O(N) behavior with the current code but not with the updated code, so we should see a performance improvement here as well -- and in fact we do.
   
   The existing code scales with the number of topics in the cluster, thus the time always doubles as the cluster size doubles, increasing from 5ms to 47ms (a factor of 9) as the cluster scales by a factor of 8.  The updated code should scale with the number of affected topics, which in this case, based on the (debatable) values chosen of 10000 replicas per broker and 10 partitions per topic, means a factor of 1.6 (from 4167 topics affected to 6667 topics affected) as the cluster scaled by a factor of 8.  In fact we see the time spent increasing by a factor of 2.6 (from 4.4 ms to 11.6 ms) when the cluster scaled by that factor of 8.  This a bit higher than expected, but it is still sub-linear (and there is some +/- error in these numbers, so the sub-linear behavior is the real point as opposed to the specific number).
   
   **Current Code, unpatched**
   Total Topic Count | milliseconds/op | error | (Brokers Impacted) | (Topics Impacted)
   -- | -- | -- | -- | --
   12,500 | 5.2 | 0.4 | 1/36 | 4,167
   25,000 | 10.6 | 0.1 | 1/75 | 5,000
   50,000 | 21.7 | 0.4 | 1/150 | 6,667
   100,000 | 47.7 | 5.2 | 1/300 | 6,667
   
   **Updated Code**
   Total Topic Count | milliseconds/op | error | (Brokers Impacted) | (Topics Impacted)
   -- | -- | -- | -- | --
   12,500 | 4.4 | 0.2 | 1/36 | 4,167
   25,000 | 6.9 | 0.2 | 1/75 | 5,000
   50,000 | 10.2 | 2.5 | 1/150 | 6,667
   100,000 | 11.6 | 2.8 | 1/300 | 6,667
   
   
   - `TopicsImageSnapshotLoadBenchmark` simulates the loading of a snapshot when the broker starts – i.e. load up 100,000 topics/1M partitions from scratch and commit them all at once.  We would expect to see some performance degradation here in the updated code, and the question really was by how much.
   
   This is the benchmark that simulates the case where every topic is affected – e.g when we load the initial snapshot during startup.  We expect the persistent data structures to perform worse here because every perturbation to create a new tree implies replacing some path in the old tree and sharing the remainder.  That’s a lot of replacing and a lot of garbage collecting.  It turns out to be a 20%-40% penalty.   This happens far less frequently than the scenarios described above – people create topics and ISRs change more frequently than the brokers roll – so the performance degradation here seems like a reasonable penalty to accept.
   
   **Current Code, unpatched**
   Total Topic Count | milliseconds/op | error
   -- | -- | --
   12,500 | 9.2 | 0.2
   25,000 | 20.7 | 1.9
   50,000 | 41.6 | 2.9
   100,000 | 110.8 | 12.4
   
   **Updated Code**
   Total Topic Count | milliseconds/op | error
   -- | -- | --
   12,500 | 13.7 | 1.3
   25,000 | 28.9 | 1.4
   50,000 | 67.6 | 1.6
   100,000 | 126.0 | 12.9
   
   
   - `KRaftMetadataRequestBenchmark` is a version of an existing set of benchmarks that we have for ZooKeeper-based brokers.  It demonstrates a *potential* slowdown in read performance with the new code of between 5% and 10% when requesting metadata for all topics -- but the evidence is not particularly strong as some cases actually perform better.  Overall there doesn't seem to be anything here that strongly cautions against this change.
   
   **KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics**
   **Current Code, unpatched**
   ```
   (partitionCount)  (topicCount)  Mode  Cnt         Score         Error  Units
                 10           500  avgt   15   1,372,555.544 ±   15168.485  ns/op
                 10          1000  avgt   15   2,726,198.798 ±   31911.035  ns/op
                 10          5000  avgt   15  41,553,723.361 ± 1394092.903  ns/op
                 20           500  avgt   15   2,373,810.148 ±   28320.684  ns/op
                 20          1000  avgt   15   5,077,005.645 ±  344757.315  ns/op
                 20          5000  avgt   15  50,903,118.952 ±  842824.639  ns/op
                 50           500  avgt   15   5,592,480.092 ±   38800.995  ns/op
                 50          1000  avgt   15  11,278,004.176 ±  140577.692  ns/op
                 50          5000  avgt   15  97,650,987.593 ± 2605902.517  ns/op
   ```
   
   **KRaftMetadataRequestBenchmark.testMetadataRequestForAllTopics**
   **Updated Code**
   ```
   (partitionCount)  (topicCount)  Mode  Cnt         Score         Error  Units
                 10           500  avgt   15   1,437,036.379 ±   10187.868  ns/op
                 10          1000  avgt   15   2,957,572.571 ±   39259.772  ns/op
                 10          5000  avgt   15  40,949,310.393 ±  354110.400  ns/op
                 20           500  avgt   15   2,493,094.563 ±   18131.215  ns/op
                 20          1000  avgt   15   5,224,699.766 ±  198612.245  ns/op
                 20          5000  avgt   15  55,643,648.154 ±  935800.708  ns/op
                 50           500  avgt   15   5,731,310.891 ±  289599.505  ns/op
                 50          1000  avgt   15  11,708,291.589 ±   63063.128  ns/op
                 50          5000  avgt   15  94,717,768.691 ± 1248511.062  ns/op
   ```
   
   There are 2 other benchmarks in the set that seem to perform comparably with and without the patch.
   
   **KRaftMetadataRequestBenchmark.testRequestToJson**
   **Current Code, unpatched**
   ```
   (partitionCount)  (topicCount)  Mode  Cnt         Score         Error  Units
                 10           500  avgt   15       674.874 ±      19.315  ns/op
                 10          1000  avgt   15       643.048 ±       4.638  ns/op
                 10          5000  avgt   15       696.829 ±      23.999  ns/op
                 20           500  avgt   15       672.617 ±       4.812  ns/op
                 20          1000  avgt   15       674.492 ±       6.206  ns/op
                 20          5000  avgt   15       677.546 ±       2.301  ns/op
                 50           500  avgt   15       682.702 ±       3.841  ns/op
                 50          1000  avgt   15       634.786 ±       6.009  ns/op
                 50          5000  avgt   15       678.107 ±       8.479  ns/op
   ```
   
   **KRaftMetadataRequestBenchmark.testRequestToJson**
   **Updated Code**
   ```
   (partitionCount)  (topicCount)  Mode  Cnt         Score         Error  Units
                 10           500  avgt   15       678.702 ±       3.331  ns/op
                 10          1000  avgt   15       659.232 ±       3.126  ns/op
                 10          5000  avgt   15       678.725 ±       5.893  ns/op
                 20           500  avgt   15       666.064 ±       2.042  ns/op
                 20          1000  avgt   15       670.959 ±       2.950  ns/op
                 20          5000  avgt   15       670.517 ±       2.473  ns/op
                 50           500  avgt   15       672.154 ±       7.125  ns/op
                 50          1000  avgt   15       665.008 ±       2.272  ns/op
                 50          5000  avgt   15       669.210 ±      27.191  ns/op
   ```
   
   **KRaftMetadataRequestBenchmark.testTopicIdInfo**
   **Current Code, unpatched**
   ```
   (partitionCount)  (topicCount)  Mode  Cnt         Score         Error  Units
                 10           500  avgt   15        10.760 ±       0.078  ns/op
                 10          1000  avgt   15        11.118 ±       0.281  ns/op
                 10          5000  avgt   15        10.882 ±       0.192  ns/op
                 20           500  avgt   15        10.822 ±       0.121  ns/op
                 20          1000  avgt   15        10.966 ±       0.569  ns/op
                 20          5000  avgt   15        10.764 ±       0.120  ns/op
                 50           500  avgt   15        10.823 ±       0.081  ns/op
                 50          1000  avgt   15        10.755 ±       0.154  ns/op
                 50          5000  avgt   15        10.694 ±       0.068  ns/op
   ```
   
   **KRaftMetadataRequestBenchmark.testTopicIdInfo**
   **Updated Code**
   ```
   (partitionCount)  (topicCount)  Mode  Cnt         Score         Error  Units
                 10           500  avgt   15        10.493 ±       0.056  ns/op
                 10          1000  avgt   15        10.507 ±       0.059  ns/op
                 10          5000  avgt   15        10.455 ±       0.055  ns/op
                 20           500  avgt   15        10.424 ±       0.035  ns/op
                 20          1000  avgt   15        10.476 ±       0.139  ns/op
                 20          5000  avgt   15        10.454 ±       0.079  ns/op
                 50           500  avgt   15        10.891 ±       0.152  ns/op
                 50          1000  avgt   15        10.509 ±       0.073  ns/op
                 50          5000  avgt   15        10.479 ±       0.059  ns/op
   ```
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162131601


##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {

Review Comment:
   I don't feel strongly about it, but I agree that from a practical point of view it might be easier to have a factory method. Then you could just replace that factory method implementation with the Vavr implementation (or whatever) and rerun the JMH benchmarks without changing anything else.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1508967636

   @rondagostino BTW I still would like to understand what is wrong with calling `values()` on the map (if anything) but let's take that discussion offline and not block on it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1502416844

   > Yes, updated would only make sense on the map interface. For the Set interface, added seems reasonable.
   
   Sorry to bikeshed this again but I kinda like `withAdded`, `withRemoval`, etc. Curious how you feel about those.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1160254440


##########
checkstyle/import-control.xml:
##########
@@ -48,6 +48,8 @@
   <allow pkg="org.apache.kafka.common.utils" />
   <allow pkg="org.apache.kafka.common.errors" exact-match="true" />
   <allow pkg="org.apache.kafka.common.memory" />
+  <!-- anyone can use persistent collection factories/non-library-specific wrappers -->
+  <allow pkg="org.apache.kafka.pcoll" exact-match="true" />

Review Comment:
   Well, "anyone" on  the server side :) I don't think we want the kafka client to take this dependency. At least not yet, until we have a very clear use-case in mind.
   
   So with that in mind, sadly it might be better to add this "allow" to server-common, metadata, and core individually, rather than up here. (although that's slightly more work I realize)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1161328104


##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {
+    /**
+     * @return the underlying persistent map
+     */
+    Object underlying();

Review Comment:
   > Is there any need to expose the underlying object?
   
   I added it as an escape hatch.  We could make it private for now, though -- it's easy to expose it later if necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1167076930


##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {
+    /**
+     * @return the underlying persistent map
+     */
+    Object underlying();
+
+    /**
+     * @param key the key
+     * @param value the value
+     * @return a wrapped persistent map that differs from this one in that the given mapping is added (if necessary)
+     */
+    ImmutableMap<K, V> updated(K key, V value);

Review Comment:
   Oh, I didn't realize this was an existing convention. Let's go with that then.



##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {
+    /**
+     * @return the underlying persistent map
+     */
+    Object underlying();
+
+    /**
+     * @param key the key
+     * @param value the value
+     * @return a wrapped persistent map that differs from this one in that the given mapping is added (if necessary)
+     */
+    ImmutableMap<K, V> updated(K key, V value);
+
+    /**
+     * @param key the key
+     * @return a wrapped persistent map that differs from this one in that the given mapping is removed (if necessary)
+     */
+    ImmutableMap<K, V> removed(K key);

Review Comment:
   Oh, I didn't realize this was an existing convention. Let's go with that then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1169270141


##########
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##########
@@ -76,8 +84,8 @@ public TopicImage getTopic(String name) {
     }
 
     public void write(ImageWriter writer, ImageWriterOptions options) {
-        for (TopicImage topicImage : topicsById.values()) {
-            topicImage.write(writer, options);
+        for (Map.Entry<Uuid, TopicImage> entry : topicsById.entrySet()) {
+            entry.getValue().write(writer, options);

Review Comment:
   Yeah, it was in the Paguro implementation where `values()` was deprecated.  We've moved away from that, but I left the code change in that stopped using the `values()` collection.  This change doesn't harm anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1172120422


##########
server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMap.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable.pcollections;
+
+import org.apache.kafka.server.immutable.ImmutableMap;
+import org.pcollections.HashPMap;
+import org.pcollections.HashTreePMap;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+@SuppressWarnings("deprecation")
+public class PCollectionsImmutableMap<K, V> implements ImmutableMap<K, V> {
+
+    private final HashPMap<K, V> underlying;
+
+    /**
+     * @return a wrapped hash-based persistent map that is empty
+     * @param <K> the key type
+     * @param <V> the value type
+     */
+    public static <K, V> PCollectionsImmutableMap<K, V> empty() {
+        return new PCollectionsImmutableMap<>(HashTreePMap.empty());
+    }
+
+    /**
+     * @param key the key
+     * @param value the value
+     * @return a wrapped hash-based persistent map that has a single mapping
+     * @param <K> the key type
+     * @param <V> the value type
+     */
+    public static <K, V> PCollectionsImmutableMap<K, V> singleton(K key, V value) {
+        return new PCollectionsImmutableMap<>(HashTreePMap.singleton(key, value));
+    }
+
+    public PCollectionsImmutableMap(HashPMap<K, V> map) {
+        this.underlying = Objects.requireNonNull(map);
+    }
+
+    @Override
+    public ImmutableMap<K, V> updated(K key, V value) {
+        return new PCollectionsImmutableMap<>(underlying().plus(key, value));
+    }
+
+    @Override
+    public ImmutableMap<K, V> removed(K key) {
+        return new PCollectionsImmutableMap<>(underlying().minus(key));
+    }
+
+    @Override
+    public int size() {
+        return underlying().size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return underlying().isEmpty();
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+        return underlying().containsKey(key);
+    }
+
+    @Override
+    public boolean containsValue(Object value) {
+        return underlying().containsValue(value);
+    }
+
+    @Override
+    public V get(Object key) {
+        return underlying().get(key);
+    }
+
+    @Override
+    public V put(K key, V value) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().put(key, value);
+    }
+
+    @Override
+    public V remove(Object key) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().remove(key);
+    }
+
+    @Override
+    public void putAll(Map<? extends K, ? extends V> m) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        underlying().putAll(m);
+    }
+
+    @Override
+    public void clear() {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        underlying().clear();
+    }
+
+    @Override
+    public Set<K> keySet() {
+        return underlying().keySet();
+    }
+
+    @Override
+    public Collection<V> values() {
+        return underlying().values();
+    }
+
+    @Override
+    public Set<Entry<K, V>> entrySet() {
+        return underlying().entrySet();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        PCollectionsImmutableMap<?, ?> that = (PCollectionsImmutableMap<?, ?>) o;
+        return underlying().equals(that.underlying());
+    }
+
+    @Override
+    public int hashCode() {
+        return underlying().hashCode();
+    }
+
+    @Override
+    public V getOrDefault(Object key, V defaultValue) {
+        return underlying().getOrDefault(key, defaultValue);
+    }
+
+    @Override
+    public void forEach(BiConsumer<? super K, ? super V> action) {
+        underlying().forEach(action);
+    }
+
+    @Override
+    public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        underlying().replaceAll(function);
+    }
+
+    @Override
+    public V putIfAbsent(K key, V value) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().putIfAbsent(key, value);
+    }
+
+    @Override
+    public boolean remove(Object key, Object value) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().remove(key, value);
+    }
+
+    @Override
+    public boolean replace(K key, V oldValue, V newValue) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().replace(key, oldValue, newValue);
+    }
+
+    @Override
+    public V replace(K key, V value) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().replace(key, value);
+    }
+
+    @Override
+    public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().computeIfAbsent(key, mappingFunction);
+    }
+
+    @Override
+    public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().computeIfPresent(key, remappingFunction);
+    }
+
+    @Override
+    public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().compute(key, remappingFunction);
+    }
+
+    @Override
+    public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().merge(key, value, remappingFunction);
+    }
+
+    @Override
+    public String toString() {
+        return "PCollectionsImmutableMap{" +
+            "underlying=" + underlying() +

Review Comment:
   I think it would be more usable if we simply had the key/values here versus exposing the nested structure. Similar for the other `toString` implementations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1500890626

   Yes, `updated` would only make sense on the map interface. For the Set interface, `added` seems reasonable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1161382595


##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {

Review Comment:
   The whole point is abstracting things from users. I don't think the users should be making the call regarding the library to be used and I don't think we should be including the name of the library in the static methods being discussed here. If we need to expose a collection from a different library because it has specific properties (say performance), then we'd use a name that makes _that_ clear (i.e. it's not about the library being used, but the type of collection property we want).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1160986944


##########
server-common/src/main/java/org/apache/kafka/pcoll/PHashMapWrapper.java:
##########
@@ -69,41 +48,4 @@ default <T> Map<K, T> asJava(Function<V, T> valueMapping) {
      * @return a wrapped persistent map that differs from this one in that the given mapping is removed (if necessary)
      */
     PHashMapWrapper<K, V> afterRemoving(K key);

Review Comment:
   How about calling this `removed` and the other one `updated`? That's the convention `scala` uses for non symbolic names and it seems to read a bit better than `after*`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1500542906

   > I think we have to implement Set, Map, and TreeMap.  Using custom types just hurts programmer productivity and ability to understand the code too much... [it] is a lot of tricky code that we don't really need,
   
   I don't think we have to if we go with a wrapper class -- it is trivial to invoke `asJava()` if the programmer wants something that implements the standard interface.  If we did want it, it is trivial to delegate all those invocations when the underlying library classes already implement those interfaces -- which is the case with `PCollections`.  But yes, it would be annoyng to have to write it for something like Vavr.  This is why I felt that the `asJava()` method is a good solution.
   
   > There is a tension... should we ditch the wrappers and use the libraries directly?
   Agreed, there's no perfect solution.  We either sprinkle the PCollections code all over the place and have to migrate all of that or we abstract it away somehow.  I think we should abstract it away but don't get too crazy about it.  I think not implementing the standard interfaces, supporting `.asJava()`, and providing type safety so people at least know they have a persistent collection is a good balance.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] emissionnebula commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "emissionnebula (via GitHub)" <gi...@apache.org>.
emissionnebula commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1161315454


##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {

Review Comment:
   +1



##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {
+    /**
+     * @return the underlying persistent map
+     */
+    Object underlying();

Review Comment:
   Is there any need to expose the underlying object? Since we want to use only `ImmutableMap` and `java.util.Map` everywhere in the code, there would be no need to expose the underlying `PCollections` type from this interface. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162134827


##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {
+    /**
+     * @return the underlying persistent map
+     */
+    Object underlying();
+
+    /**
+     * @param key the key
+     * @param value the value
+     * @return a wrapped persistent map that differs from this one in that the given mapping is added (if necessary)
+     */
+    ImmutableMap<K, V> updated(K key, V value);

Review Comment:
   how do you guys feel about `withUpdate` as a name?



##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {
+    /**
+     * @return the underlying persistent map
+     */
+    Object underlying();
+
+    /**
+     * @param key the key
+     * @param value the value
+     * @return a wrapped persistent map that differs from this one in that the given mapping is added (if necessary)
+     */
+    ImmutableMap<K, V> updated(K key, V value);
+
+    /**
+     * @param key the key
+     * @return a wrapped persistent map that differs from this one in that the given mapping is removed (if necessary)
+     */
+    ImmutableMap<K, V> removed(K key);

Review Comment:
   how do you guys feel about `withRemoval` as a name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1500342559

   Thanks for the review, @cmccabe.  No problem with the package change and the more precise import controls.
   
   Regarding the wrapper suggestion:
   
   It is true that the wrapper implementation in the PR as it stands right now always creates a new object when creating a new persistent collection (either by creating an empty/singleton or by adding/removing from an existing one).  The overhead is low -- object creation generally isn't onerous in time or space in general, and the work created by having the additional object will be a very low percentage of the work that the underlying persistent collection has to do anyway.  So it doesn't concern me very much in that regard, but I do agree we should avoid it it is unnecessary.
   
   PCollections and Paguro both support using the `java.util` interfaces directly on their implementation classes, but Vavr (or whatever else we might decide to abstract away like this) does not.  Such libraries would require the use of a wrapper class as is proposed here.  The `PersistentCollectionFactory` you proposed would still work -- it would just take wrapper instances and delegate to those, which would in turn delegate to the underlying wrapped persistent collection.  So there would be 2 levels of indirection instead of 1 for those libraries (the factory always has to do the downcast and perform the delegation, but in the Vavr-like cases it would cast the input to a wrapper and delegate to that, which would then in turn do the delegation again to the underlying library object).   But delegation is cheap, so I would have no problem with that.  In short, being required to introduce a wrapper for some libraries doesn't necessarily require us to introduce one for PCollections.  
   
   The last issue I can think of that we should discuss here is type safety.  I would prefer to be able to know that something is in fact a persistent collection.  Unfortunately there is no way to keep that distinction with your proposed factory definition.  This drawback gives me pause.
   
   So I think we have a situation where the current PR introduces a small amount of overhead that is not expected to be significant but that we would prefer to eliminate of possible.  But the cost is a lack of type safety -- the compiler cannot know if a collection is a persistent one or not.
   
   Is this a fair discussion of the tradeoffs?
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] emissionnebula commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "emissionnebula (via GitHub)" <gi...@apache.org>.
emissionnebula commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1480238165

   Thanks @rondagostino for the PR. I am also evaluating different persistent libraries for the [removal of the R/W lock from StandardAuthorizer](https://github.com/apache/kafka/pull/13437). I checked three libraries - pcollections, Paguro and Vavr. 
   
   For the StandardAuthorizer, it looks like `pcollections` shows read-time performance similar to the existing code and 3-4x of AclAuthorizer. But the Vavr and Paguro are performing very badly at the time of read - around 5000x of current code performance. Even though the write time performance of Paguro was the best and `pcollections` was around 20x of Paguro. 
   
   Because of read performance, pcollections looks like a clear choice for the Authorizer use case. For the Kraft metadata image change use case, should we evaluate the performance with `pcollections` as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1499641310

   Thanks for working on this, @rondagostino and @showuon ! I think this will be a really cool improvement.
   
   A note about namespace names:
   
   Rather than having an abbreviated namespace like `org.apache.kafka.pcoll` , how about `org.apache.kafka.server.persistent` ? This emphasizes that the code is in `server-common` and is more consistent with our other namespace names, which mostly don't use abbreviations.
   
   I don't have any objection to wrapping pcollections so that you can swap in a different library in the future. However, I don't like the wrappers here for two main reasons:
   1. They don't implement standard collection interfaces (java.util.Map, java.util.Set)
   2. They involve an additional "wrapper object" for every operation (every addition, etc.)
   
   The first issue is easy to fix. If you look at TimelineHashMap.java and TimelineHashSet.java, you can see that I implemented the standard Map and Set operations without too much fuss. It is a lot better than just implementing `Iterable` and some getters or having awkward "convert this to a real java collection" methods.
   
   The second issue is harder to fix because of the nature of Java's type system. However, I suspect that you might be able to do it by having something like a `PCollectionFactory` that implemented `PersistentCollectionFactory`. Then you could have methods like:
   
   ```
   class PersistentCollectionFactory<K, V> {
   Map<K, V> emptyMap<K, V>()
   Map<K, V> singletonMap<K, V>(K k, V v)
   Map<K, V> afterAdding<K, V>(Map<K, V> map, K k, V v)
   Map<K, V> afterRemoving<K, V>(Map<K, V> map, K k, V v)
   Map<K, V> afterRemoving<K, V>(Map<K, V> map, K k)
   ...
   }
   ```
   
   This would basically let you not have to cart around a wrapper object for each new map you created. However you could avoid dealing with pcollections types entirely in the calling code, and just refer to java.util.Map, etc. Since the pcollections type do implement the standard java types, this would work well here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1500606628

   We can implement the `java.util` interfaces in the wrapper classes.  It is trivial to do so via delegation when the underlying library classes already implement those interfaces -- which is the case with `PCollections`.
   
   It will be more difficult for libraries like Vavr that do not support those interfaces directly.  See, for example, the `VavrMapAsJava` class I implemented in the Vavr commit, https://github.com/apache/kafka/pull/13280/commits/f195c2055bad96da2a189ff35ea0dd9263da5d6e.  But it was not onerous.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1500671475

   My suggestion is to introduce a new interface like `ImmutableMap` (`PMap` or `PersistentMap` are also possible, but they can be confused with a map that persists the data) and introduce new methods to that interface. Our code should only interact with `ImmutableMap`, `java.util.Map`, etc.
   
   For code that doesn't need the new methods exposed via `ImmutableMap`, we can choose to use `java.util.Map` if we like.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162131601


##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {

Review Comment:
   I don't feel very strongly about this either way. I like the elegance of having a separate factory instead of static functions, but I do wonder if that will make it harder to do a quick replacement of the current implementation with the Vavr implementation (or whatever) and rerun the JMH benchmarks without changing anything else. Maybe it's fine as-is, though?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162128357


##########
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##########
@@ -76,8 +84,8 @@ public TopicImage getTopic(String name) {
     }
 
     public void write(ImageWriter writer, ImageWriterOptions options) {
-        for (TopicImage topicImage : topicsById.values()) {
-            topicImage.write(writer, options);
+        for (Map.Entry<Uuid, TopicImage> entry : topicsById.entrySet()) {
+            entry.getValue().write(writer, options);

Review Comment:
   The `java.util.Collection` returned by `java.util.Map#values` is pretty basic. I can't find any comment about how its equals method is supposed to work. For `HashMap#values`, it seems to just be doing reference equality. (In contrast, `Map#keySet` returns an actual Set which is compared how you would expect.)
   
   @rondagostino, like you, I wrote a test program with `java.util.HashMap<StringString>` and `java.util.HashSet<String>` and got these very similar results
   ```
   foo = {a->a, b->b)
   bar = {a, b}
   foo.values().equals(foo.values()) = true
   new HashSet<>(foo.values()).equals(bar) = true
   foo.values().equals(bar) = false
   bar.equals(foo.values()) = false
   foo.keySet().equals(bar) = true
   bar.equals(foo.keySet()) = true
   ```
   
   > We could, but it is marked deprecated in the library because there is no way to provide a reasonable .equals() method. I actually checked, and indeed it is true:
   
   What version of the pcollections source code were you looking at? I downloaded the source from https://github.com/hrldcpr/pcollections and wasn't able to find any comment or deprecated indicator for `HashPMap#values()`. In fact, it looks like it simply inherits the `AbstractMap#values` implementation without any changes. I suspect that this will actually implement reference equality, since this implementation saves the Collection object it creates in a `transient` field (ew)
   
   But even leaving that aside, I can't find any API guarantees about what the equals method of the collection returned by `Map#values` is supposed to do. It's possible that this is just undefined. At any rate the existing behavior of the java.util.Map subclasses is certainly useless here (it will not be what anyone expects)
   
   `Collection#equals` says you can do whatever you want for `equals`, but you should "exercise care". Zooming out a bit, the big picture is that interfaces like List or Set define a reasonable behavior for equals, whereas Collection (which is the base class for both) is just like #:shrug: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1446491240

   The `topicIdInfo` benchmark now performs equivalently.  I eliminated the `TranslatedValueMapView` class in favor of adding a value mapper to the `VavrMapAsJava` class.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1161382595


##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {

Review Comment:
   The whole point is abstracting things from users. I don't think they should be making the call and I don't think we should be including the name of the library. If we need to expose a collection from a different library because it has specific properties (say performance), then we'd use a name that makes _that_ clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162191919


##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {
+    /**
+     * @return the underlying persistent map
+     */
+    Object underlying();
+
+    /**
+     * @param key the key
+     * @param value the value
+     * @return a wrapped persistent map that differs from this one in that the given mapping is added (if necessary)
+     */
+    ImmutableMap<K, V> updated(K key, V value);

Review Comment:
   I was trying to avoid inventing a new naming convention and hence went with what `Scala` had chosen. If we have another convention we like more that there is a precedent for, happy to discuss.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1161329362


##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {

Review Comment:
   If we add static methods here then those methods would have to hard-code which underlying library to use.  Right now we just have PCollections because it meets a mandatory requirement for the ACL lock removal work, but it is possible we might find either a need or a significant performance advantage to using something else for some use case.  Any static methods here might have to have the underlying library in the name.  For example, a method name here might be `emptyMapPCollections()` (or `pCollectionsEmptyMap()`.  I assumed it was easy for clients to just explicitly invoke `PCOLLECTIONS_FACTORY.emptySet()` (for example) -- or, more likely, they would have access to the factory already (like `TopicsImage` does as it declares `private static final ImmutableMapSetFactory FACTORY = ImmutableMapSetFactory.PCOLLECTIONS_FACTORY;`) and could invoke `FACTORY.emptySet()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1439780222

   @rondagostino , also, it looks like this library doesn't support JDK 8? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino merged pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino merged PR #13280:
URL: https://github.com/apache/kafka/pull/13280


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1442082585

   Marking this as a Draft PR until we have a solution for replacing Paguro.  One library that extends the `java.util` interfaces is [PCollections](https://github.com/hrldcpr/pcollections), but that has some potential performance concerns as described [here](https://github.com/GlenKPeterson/Paguro/wiki/UncleJim-vs.-PCollections).
   
   [vavr](https://github.com/vavr-io/vavr) is inspired by Scala, so it culturally fits here.  Its classes do not implement the `java.util` interfaces, so I expect it will feel weird for methods to return a `HashMap` that turns out to be a `io.vavr.collection.HashMap` rather than the `java.util.HashMap` that one would assume.  And it means signatures need to change -- something that returns a `Map` can no longer do so, and callers will be impacted.  We're not talking a KIP here, but we are talking a bigger code impact than what this PR currently has.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1114679640


##########
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##########
@@ -76,8 +84,8 @@ public TopicImage getTopic(String name) {
     }
 
     public void write(ImageWriter writer, ImageWriterOptions options) {
-        for (TopicImage topicImage : topicsById.values()) {
-            topicImage.write(writer, options);
+        for (Map.Entry<Uuid, TopicImage> entry : topicsById.entrySet()) {
+            entry.getValue().write(writer, options);

Review Comment:
   We could, but it is marked deprecated in the library because there is no way to provide a reasonable `.equals()` method.  I actually checked, and indeed it is true:
   ```
       @Test
       public void testMapValuesEquality() {
           Map<String, String> m = new HashMap<>();
           m.put("a", "a");
           m.put("b", "b");
           assertEquals(m.keySet(), new HashSet<>(Arrays.asList("a", "b")));
           assertEquals(m.keySet(), new HashSet<>(Arrays.asList("b", "a")));
           // note that these all assert inequality
           assertNotEquals(m.values(), new HashSet<>(Arrays.asList("a", "b")));
           assertNotEquals(m.values(), Arrays.asList("a", "b"));
           assertNotEquals(m.values(), new HashSet<>(Arrays.asList("b", "a")));
           assertNotEquals(m.values(), Arrays.asList("b", "a"));
       }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1503436183

   > documented it which we can as well.
   
   Yes, the Javadoc uses the term `persistent`.
   
   I'm fine with the package/class/method names as they exist right now.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1161383163


##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {
+    /**
+     * @return the underlying persistent map
+     */
+    Object underlying();

Review Comment:
   Escape hatches should generally not be added unless they're actually needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] emissionnebula commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "emissionnebula (via GitHub)" <gi...@apache.org>.
emissionnebula commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1502743377

   > @ijuma @rondagostino : I feel like "persistent" should appear somewhere in the class names here. Perhaps you're right that it doesn't need to be in the short class name, but can we put the classes in a namespace that includes that word? So something like `org.apache.kafka.server.persistent`? Then we'd have `org.apache.kafka.server.persistent.ImmutableMap`, etc.
   
   > PMap or PersistentMap are also possible, but they can be confused with a map that persists the data
   
   I feel we should not only use the `persistent` in the package name but also name the class `PersistentMap`. Looks like persistent is not a new term, there was a 1989 [paper](https://www.cs.cmu.edu/~sleator/papers/making-data-structures-persistent.pdf) that coined this term. Clojure also defines them as PersistentVector and PersistentMap. Languages such as Scala, Elixir, and Haskell use the term persistent in the documentation as well. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162129955


##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {
+    /**
+     * @return the underlying persistent map
+     */
+    Object underlying();

Review Comment:
   Yeah, I think it would be good to avoid this unless there is a concrete reason we need it...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1494811199

   The persistent collection library space remain unsettled (frustrating that it would remain this way even after so many years). I therefore added wrapper classes and import controls so that we don't end up sprinkling library-specific code all over the place.  This will help insulate us from changes in this space over time.
   
   Given that the `PColllections` benchmarks are a massive improvement over the existing code, we can just go with `PCollections` here since that is what is needed in https://github.com/apache/kafka/pull/13437.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1172120422


##########
server-common/src/main/java/org/apache/kafka/server/immutable/pcollections/PCollectionsImmutableMap.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable.pcollections;
+
+import org.apache.kafka.server.immutable.ImmutableMap;
+import org.pcollections.HashPMap;
+import org.pcollections.HashTreePMap;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+@SuppressWarnings("deprecation")
+public class PCollectionsImmutableMap<K, V> implements ImmutableMap<K, V> {
+
+    private final HashPMap<K, V> underlying;
+
+    /**
+     * @return a wrapped hash-based persistent map that is empty
+     * @param <K> the key type
+     * @param <V> the value type
+     */
+    public static <K, V> PCollectionsImmutableMap<K, V> empty() {
+        return new PCollectionsImmutableMap<>(HashTreePMap.empty());
+    }
+
+    /**
+     * @param key the key
+     * @param value the value
+     * @return a wrapped hash-based persistent map that has a single mapping
+     * @param <K> the key type
+     * @param <V> the value type
+     */
+    public static <K, V> PCollectionsImmutableMap<K, V> singleton(K key, V value) {
+        return new PCollectionsImmutableMap<>(HashTreePMap.singleton(key, value));
+    }
+
+    public PCollectionsImmutableMap(HashPMap<K, V> map) {
+        this.underlying = Objects.requireNonNull(map);
+    }
+
+    @Override
+    public ImmutableMap<K, V> updated(K key, V value) {
+        return new PCollectionsImmutableMap<>(underlying().plus(key, value));
+    }
+
+    @Override
+    public ImmutableMap<K, V> removed(K key) {
+        return new PCollectionsImmutableMap<>(underlying().minus(key));
+    }
+
+    @Override
+    public int size() {
+        return underlying().size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return underlying().isEmpty();
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+        return underlying().containsKey(key);
+    }
+
+    @Override
+    public boolean containsValue(Object value) {
+        return underlying().containsValue(value);
+    }
+
+    @Override
+    public V get(Object key) {
+        return underlying().get(key);
+    }
+
+    @Override
+    public V put(K key, V value) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().put(key, value);
+    }
+
+    @Override
+    public V remove(Object key) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().remove(key);
+    }
+
+    @Override
+    public void putAll(Map<? extends K, ? extends V> m) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        underlying().putAll(m);
+    }
+
+    @Override
+    public void clear() {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        underlying().clear();
+    }
+
+    @Override
+    public Set<K> keySet() {
+        return underlying().keySet();
+    }
+
+    @Override
+    public Collection<V> values() {
+        return underlying().values();
+    }
+
+    @Override
+    public Set<Entry<K, V>> entrySet() {
+        return underlying().entrySet();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        PCollectionsImmutableMap<?, ?> that = (PCollectionsImmutableMap<?, ?>) o;
+        return underlying().equals(that.underlying());
+    }
+
+    @Override
+    public int hashCode() {
+        return underlying().hashCode();
+    }
+
+    @Override
+    public V getOrDefault(Object key, V defaultValue) {
+        return underlying().getOrDefault(key, defaultValue);
+    }
+
+    @Override
+    public void forEach(BiConsumer<? super K, ? super V> action) {
+        underlying().forEach(action);
+    }
+
+    @Override
+    public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        underlying().replaceAll(function);
+    }
+
+    @Override
+    public V putIfAbsent(K key, V value) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().putIfAbsent(key, value);
+    }
+
+    @Override
+    public boolean remove(Object key, Object value) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().remove(key, value);
+    }
+
+    @Override
+    public boolean replace(K key, V oldValue, V newValue) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().replace(key, oldValue, newValue);
+    }
+
+    @Override
+    public V replace(K key, V value) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().replace(key, value);
+    }
+
+    @Override
+    public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().computeIfAbsent(key, mappingFunction);
+    }
+
+    @Override
+    public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().computeIfPresent(key, remappingFunction);
+    }
+
+    @Override
+    public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().compute(key, remappingFunction);
+    }
+
+    @Override
+    public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
+        // will throw UnsupportedOperationException; delegate anyway for testability
+        return underlying().merge(key, value, remappingFunction);
+    }
+
+    @Override
+    public String toString() {
+        return "PCollectionsImmutableMap{" +
+            "underlying=" + underlying() +

Review Comment:
   I think it would be more usable if we simply had the key/values here versus exposing the nested structure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1500464493

   > It is true that the wrapper implementation in the PR as it stands right now always creates a new object when creating a new persistent collection (either by creating an empty/singleton or by adding/removing from an existing one). The overhead is low -- object creation isn't onerous in time or space when object initialization is cheap (which it is here), and the work created by having the additional object will be a very low percentage of the work that the underlying persistent collection has to do anyway. So it doesn't concern me very much in that regard, but I do agree we should avoid it if we decide it is unnecessary and any tradeoff ends up being worth it.
   
   The performance issues are debatable, I'll definitely grant you that. And I feel kind of bad mentioning them considering what a big performance improvement this will be in general. Probably the bigger issue is the extra complexity of implementing our own Set / Map / TreeMap / etc. It's all doable (and perhaps we'll end up doing it for Vavr) but it feels kind of bad when the library has already done that work.
   
   > So I think we have a situation where the current PR introduces a small amount of overhead that is not expected to be significant but that we would prefer to eliminate of possible. But the cost of eliminating it is a lack of type safety -- the compiler would not be able to know if a collection is a persistent one or not.
   
   I agree that the solution I proprosed lacks type safety. It would be possible for someone to pass a non-persistent map to one of the `PersistentCollectionFactory` methods. Or, of course, to pass a different kind of persistent collection than what is expected.
   
   If type safety is a requirement, it would be possible to add a third generic parameter which is the type of the Map. But this gets back to your original objection to exposing these types, which is that people could then invoke methods directly on the pcollections map type. The users would also need to pull in the pcollections imports and so on. I feel that there is no advantage to going down this path versus just using pcollections directly, since the effort to switch to something else later would not be lessened.
   
   Another option is to actually handle the different types in an appropriate fashion. For example, if someone gives you a non-pcollections `java.util.HashMap`, you can implement `afterAdding` by copying it with the requested addition. Hmm... now that I think about this more, it actually sounds like a pretty good option. I suppose in that case we could just have a bunch of static methods in `org.apache.kafka.server.persistent.PersistentCollection` like `PersistentCollection. afterAdding(K k, V v)` and implement them appropriately for all persistent collection types we support...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1500536745

   > i think you are referring to having to implement that if we don't go with the current PR implementation and instead go with something like what you proposed -- since the current PR wrapper implementations don't implement the Set/Map interfaces. Is that correct? In other words, I think you are arguing against your proposal -- I just want to be sure.
   
   So firstly, as per the earlier discussion in the PR, I think we have to implement `Set`, `Map`, and `TreeMap`. Using custom types just hurts programmer productivity and ability to understand the code too much. Obviously there are going to be some differences with the `java.util` versions, but hopefully those can be minimal, so people's intuitions about what is O(1), O(N), O(log N) can be correct, as well as people's understanding of what the code does.
   
   So the contest here is between
   1. implementing your own Set, Map, TreeMap wrappers above the library code, or
   2. implementing a helper class like I suggested above
   3. just using pcollections directly, and adding a wrapper class only for the libraries that need it, like Vavr
    
   My main argument is that 1 is a lot of tricky code that we don't really need, and likely worse performance (it's hard to argue about it without benchmarks, of course).
   
   3 will make it slightly harder to try different collections libraries. Although in theory we could ask people to use our own helper functions rather than PCollections-specific methods, to minimize the delta needed. Perhaps this is an avenue worth exploring.
   
   > I think it is important for people to be able to easily understand what performance they are going to get from a function. If that's not clear, and we end up giving O(1) performance if the downcast to a persistent collection can occur vs. O(N) performance if it cannot -- I think that would make it more likely for O(N) performance to occur inadvertently. Especially since people won't necessarily be able to know what type they have just by reading the immediate code -- if all they know is that have a java.util.Map then they might have to trace back to figure out where they actually got that, what the actual type is, and therefore what O() behavior they are going to get if they try to add something to it.
   
   There is a tension between what you are trying to do here (abstract over multiple libraries) and the idea that "people [need] to be able to easily understand what performance they are going to get from a function." If a new library we add later is somewhat better at get() and worse at put(), does that means that people using your wrappers can't "easily understand what performance they are going to get"? Arguably yes. So should we ditch the wrappers and use the libraries directly?
   
   Ultimately the test of performance is our JMH benchmarks, as well as the profiles we take in production. I believe we should rely on those, rather than assuming we can deduce everything about performance ahead of time...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1500522402

   > Probably the bigger issue is the extra complexity of implementing our own Set / Map / TreeMap / etc. It's all doable (and perhaps we'll end up doing it for Vavr) but it feels kind of bad when the library has already done that work.
   
   i think you are referring to having to implement that if we don't go with the current PR implementation and instead go with something like what you proposed -- since the current PR wrapper implementations don't implement the Set/Map interfaces.  Is that correct?  In other words, I think you are arguing against your proposal -- I just want to be sure.
   
   > handle the different types in an appropriate fashion
   > `java.util.HashMap`... implement `afterAdding` by copying it with the requested addition
   
   I think it is important for people to be able to easily understand what performance they are going to get from a function.  If that's not clear, and we end up giving O(1) performance if the downcast to a persistent collection can occur vs. O(N) performance if it cannot -- I think that would make it more likely for O(N) performance to occur inadvertently.  Especially since people won't necessarily be able to know what type they have just by reading the immediate code -- if all they know is that have a `java.util.Map` then they might have to trace back to figure out where they actually got that, what the actual type is, and therefore what O() behavior they are going to get if they try to add something to it.
   
   > have a bunch of static methods in org.apache.kafka.server.persistent.PersistentCollection like PersistentCollection. afterAdding(K k, V v) and implement them appropriately for all persistent collection types we support.
   
   I'm not following this suggestion.  I don't think this would give us type safety.  It feels like it is just the wrapper class I've implemented here.  I'm perhaps missing something.
   
   Overall, I'm feeling like type safety is not worth giving up for the very small benefit of eliminating the wrapper class.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1160986944


##########
server-common/src/main/java/org/apache/kafka/pcoll/PHashMapWrapper.java:
##########
@@ -69,41 +48,4 @@ default <T> Map<K, T> asJava(Function<V, T> valueMapping) {
      * @return a wrapped persistent map that differs from this one in that the given mapping is removed (if necessary)
      */
     PHashMapWrapper<K, V> afterRemoving(K key);

Review Comment:
   How about calling this `removed` and the other one `added`? That's the convention `scala` uses for non symbolic names and it seems to read a bit better than `after*`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1503626134

   > I'm fine with the package/class/method names as they exist right now.
   
   +1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1500610824

   > Also, as you yourself argued, converting back to java.util.Map loses type safety, since now we don't know if we have a persistent map or not. It is quite useful to know that your map is immutable, even if I doubt that confusion between regular types and persistent ones will be as common as you suggest.
   
   Yeah, I was including `.asJava()` really only as a way to not have to change our existing type return values.  If something returns a `java.util.Map<>` and now we are implementing the logic with persistent data structures, do we want to change that signature to indicate that the returned map is persistent?  When we weren't considering the use of a wrapper class it was very intrusive to change the signature to say we are now returning a Vavr map -- the ripple effects went everywhere.
   
   But I guess this does argue for implementing the standard interfaces in the wrapper.  It would allow us to change the signature but downstream code is unaffected.  That code might declare `Map<Class1, Class2> x = someObject.someMethod()` and that will still compile when we declare `someMethod()` to return a wrapper class when that wrapper class implements `java.util.Map`.  That insulates us from ripple-effect changes while simultaneously giving callers the option to actually know that the map is a persistent one.
   
   I think this is the argument that convinces me to implement the standard interfaces in the wrapper classes.  I'll add it now.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1114692109


##########
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##########
@@ -38,15 +40,21 @@
  */
 public final class TopicsImage {
     public static final TopicsImage EMPTY =
-        new TopicsImage(Collections.emptyMap(), Collections.emptyMap());
+        new TopicsImage(map(), map());
+
+    final ImMap<Uuid, TopicImage> topicsById;
+    final ImMap<String, TopicImage> topicsByName;
 
-    private final Map<Uuid, TopicImage> topicsById;
-    private final Map<String, TopicImage> topicsByName;
+    public TopicsImage(ImMap<Uuid, TopicImage> topicsById,
+                       ImMap<String, TopicImage> topicsByName) {
+        this.topicsById = topicsById;
+        this.topicsByName = topicsByName;
+    }
 
-    public TopicsImage(Map<Uuid, TopicImage> topicsById,
-                       Map<String, TopicImage> topicsByName) {
-        this.topicsById = Collections.unmodifiableMap(topicsById);
-        this.topicsByName = Collections.unmodifiableMap(topicsByName);
+    public TopicsImage including(TopicImage topic) {

Review Comment:
   It is used from a test class in the `core` module so it needs to be public :-(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1114673066


##########
metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java:
##########
@@ -126,29 +127,27 @@ public void handleMetadataVersionChange(MetadataVersion newVersion) {
     }
 
     public TopicsImage apply() {
-        Map<Uuid, TopicImage> newTopicsById = new HashMap<>(image.topicsById().size());
-        Map<String, TopicImage> newTopicsByName = new HashMap<>(image.topicsByName().size());
-        for (Entry<Uuid, TopicImage> entry : image.topicsById().entrySet()) {
-            Uuid id = entry.getKey();
-            TopicImage prevTopicImage = entry.getValue();
-            TopicDelta delta = changedTopics.get(id);
-            if (delta == null) {
-                if (!deletedTopicIds.contains(id)) {
-                    newTopicsById.put(id, prevTopicImage);
-                    newTopicsByName.put(prevTopicImage.name(), prevTopicImage);
-                }
+        ImMap<Uuid, TopicImage> newTopicsById = image.topicsById;
+        ImMap<String, TopicImage> newTopicsByName = image.topicsByName;
+        // apply all the deletes
+        for (Uuid topicId: deletedTopicIds) {
+            // it was deleted, so we have to remove it from the maps
+            TopicImage originalTopicToBeDeleted = image.topicsById.get(topicId);
+            if (originalTopicToBeDeleted == null) {
+                throw new IllegalStateException("Missing topic id " + topicId);
             } else {
-                TopicImage newTopicImage = delta.apply();
-                newTopicsById.put(id, newTopicImage);
-                newTopicsByName.put(delta.name(), newTopicImage);
+                newTopicsById = newTopicsById.without(topicId);
+                newTopicsByName = newTopicsByName.without(originalTopicToBeDeleted.name());
             }
         }
-        for (Entry<Uuid, TopicDelta> entry : changedTopics.entrySet()) {
-            if (!newTopicsById.containsKey(entry.getKey())) {
-                TopicImage newTopicImage = entry.getValue().apply();
-                newTopicsById.put(newTopicImage.id(), newTopicImage);
-                newTopicsByName.put(newTopicImage.name(), newTopicImage);
-            }
+        // apply all the updates/additions
+        for (Map.Entry<Uuid, TopicDelta> entry: changedTopics.entrySet()) {
+            Uuid topicId = entry.getKey();
+            TopicImage newTopicToBeAddedOrUpdated = entry.getValue().apply();
+            // put new information into the maps
+            String topicName = newTopicToBeAddedOrUpdated.name();
+            newTopicsById = newTopicsById.assoc(topicId, newTopicToBeAddedOrUpdated);
+            newTopicsByName = newTopicsByName.assoc(topicName, newTopicToBeAddedOrUpdated);

Review Comment:
   Persistent data structures are immutable, so they always create and return a new data structure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162107641


##########
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##########
@@ -21,41 +21,48 @@
 import org.apache.kafka.image.writer.ImageWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
 import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.server.immutable.ImmutableMap;
+import org.apache.kafka.server.immutable.ImmutableMapSetFactory;
 import org.apache.kafka.server.util.TranslatedValueMapView;
 
-import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
-
 /**
  * Represents the topics in the metadata image.
  *
  * This class is thread-safe.
  */
 public final class TopicsImage {
-    public static final TopicsImage EMPTY =
-        new TopicsImage(Collections.emptyMap(), Collections.emptyMap());
+    private static final ImmutableMapSetFactory FACTORY = ImmutableMapSetFactory.PCOLLECTIONS_FACTORY;
+
+    public static final TopicsImage EMPTY =  new TopicsImage(FACTORY.emptyMap(), FACTORY.emptyMap());
 
-    private final Map<Uuid, TopicImage> topicsById;
-    private final Map<String, TopicImage> topicsByName;
+    final ImmutableMap<Uuid, TopicImage> topicsById;

Review Comment:
   I realize that this is immutable and very unlikely to ever be computed on the fly, but I would still really like to avoid direct field access. If we start doing stuff like this, people will tend to copy the pattern where it can cause harm (for example, where the map is mutable, or where we might want to compute the value on the fly). And there is no benefit here, so let's avoid it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162128357


##########
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##########
@@ -76,8 +84,8 @@ public TopicImage getTopic(String name) {
     }
 
     public void write(ImageWriter writer, ImageWriterOptions options) {
-        for (TopicImage topicImage : topicsById.values()) {
-            topicImage.write(writer, options);
+        for (Map.Entry<Uuid, TopicImage> entry : topicsById.entrySet()) {
+            entry.getValue().write(writer, options);

Review Comment:
   The `java.util.Collection` returned by `java.util.Map#values` is pretty basic. I can't find any comment about how its equals method is supposed to work. For `HashMap#values`, it seems to just be doing reference equality. (In contrast, `Map#keySet` returns an actual Set which is compared how you would expect.)
   
   @rondagostino, like you, I wrote a test program with `java.util.HashMap<StringString>` and `java.util.HashSet<String>` and got these very similar results
   ```
   foo = {a->a, b->b)
   bar = {a, b}
   foo.values().equals(foo.values()) = true
   new HashSet<>(foo.values()).equals(bar) = true
   foo.values().equals(bar) = false
   bar.equals(foo.values()) = false
   foo.keySet().equals(bar) = true
   bar.equals(foo.keySet()) = true
   ```
   
   > We could, but it is marked deprecated in the library because there is no way to provide a reasonable .equals() method. I actually checked, and indeed it is true:
   
   What version of the pcollections source code were you looking at? I downloaded the source from https://github.com/hrldcpr/pcollections and wasn't able to find any comment or deprecated indicator for `HashPMap#values()`. In fact, it looks like it simply inherits the `AbstractMap#values` implementation without any changes. I suspect that this will actually implement reference equality, since this implementation saves the Collection object it creates in a `transient` field (ew)
   
   But even leaving that aside, I can't find any API guarantees about what the equals method of the collection returned by `Map#values` is supposed to do. It's possible that this is just undefined. At any rate the existing behavior of the java.util.Map subclasses is certainly useless here (it will not be what anyone expects)
   
   `Collection#equals` says you can do whatever you want for `equals`, but you should "exercise care". Zooming out a bit, the big picture is that interfaces like List or Set define a reasonable behavior for equals, whereas Collection (which is a parent interface for both) is just like :shrug: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1500597366

   It's messy to have to go through a conversion function to use standard interfaces. The fact that we have to do it in Scala is a big negative for Scala. I'm greatly looking forward to no longer having to do this once we get rid of Scala. I would like to never again have to convert between (or ponder the difference between) a `collections.Seq` and a `collections.immutable.Seq`, or a scala map versus a java map.
   
   Also, as you yourself argued, converting back to java.util.Map loses type safety, since now we don't know if we have a persistent map or not. It is quite useful to know that your map is immutable, even if I doubt that confusion between regular types and persistent ones will be as common as you suggest.
    
   Most importantly, I don't think the delta between a wrapper type that implements java.util.Map and one that doesn't is that big. Like I said earlier, we did this in TimelineHashMap and other places. While I'd prefer to not have wrappers, if we must have them they should not require downcasting to be useful. If you want I can raise a PR to implement the standard interfaces.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162191307


##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {

Review Comment:
   I think this looks pretty clean now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1500734932

   Thanks for the review @ijuma.  I think you have suggested 2 name changes: one to rename the wrapper  interfaces and one to Rena e the methods on those interfaces.
   
   Current interface names:
   
   PHash{Map,Set}Wrapper
   
   Proposed interface names:
   
   Immutable{Map,Set}
   Persistent{Map,Set}
   P{Map,Set}
   
   
   I don't have a strong a strong preference, so I'll go with Immutable{Map,Set} given your comment about the potential dual meaning of persistent.
   
   Regarding the method names.
   
   Current method names:
   
   newSet = origSet.afterAdding(e);
   NewMap = origMap.afterRemoving(k);
   
   These read pretty well for me.
   
   Proposed names:
   
   newSet = origSet.updated(e);
   newMap = origMap.removed(k);
   
   I'm fine with this style, though maybe "origSet.added(e);" would be better?
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1502415755

   @ijuma @rondagostino : I feel like "persistent" should appear somewhere in the class names here. Perhaps you're right that it doesn't need to be in the short class name, but can we put the classes in a namespace that includes that word? So something like `org.apache.kafka.server.persistent`? Then we'd have `org.apache.kafka.server.persistent.ImmutableMap`, etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1503264226

   I don't see strong motivation for having that in the name. Most of the examples you have simply documented it which we can as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1508964167

   OK. It sounds like the consensus is that we should keep the "immutable" naming then.
   
   Thanks, @rondagostino , @showuon , @emissionnebula , @ijuma . LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] emissionnebula commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "emissionnebula (via GitHub)" <gi...@apache.org>.
emissionnebula commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1505166712

   > I don't see strong motivation for having that in the name. Most of the examples you have simply documented it which we can as well.
   
   Yeah, correct. Scala doesn't include it in the name though it is persistent. 
   
   I am also fine to follow the current naming which conforms to Scala.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1445517695

   I removed Paguro in favor of [Vavr](https://github.com/vavr-io/vavr).  The PR size would have been much larger without the addition of the `Vavr{Set,Map}AsJava` classes that wrap the Vavr maps and sets as their unmodifiable `java.util` equivalents.  I've updated the PR description to show the Vavr benchmark numbers alongside the Paguro ones.  They are comparable, though the `topicIdInfo` benchmark shows the several extra nanoseconds it takes to instantiate the wrapper class for the var implementation.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1161139673


##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {

Review Comment:
   Did we consider adding the methods to create collections here as static methods?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1500625439

   Type safety is a tricky thing. There's no hard and fast rule about where to stop :)
   
   I do genuinely think encoding "this is persistent and therefore immutable" in the type system is a good thing... maybe it's my history as a C++ programmer (heh)
   
   Anyway sounds good


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162184505


##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {

Review Comment:
   I updated it.  I removed the factory in favor of static methods on the interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162184665


##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {
+    /**
+     * @return the underlying persistent map
+     */
+    Object underlying();

Review Comment:
   I removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162191307


##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {

Review Comment:
   I think this looks pretty clean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "ijuma (via GitHub)" <gi...@apache.org>.
ijuma commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1161382595


##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {

Review Comment:
   The whole point is abstracting things from users. I don't think the users should be making the call regarding the library to be used and I don't think we should be including the name of the library. If we need to expose a collection from a different library because it has specific properties (say performance), then we'd use a name that makes _that_ clear.



##########
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw UnsupportedOperationException
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public interface ImmutableMap<K, V> extends Map<K, V> {

Review Comment:
   The whole point is abstracting things from users. I don't think the users should be making the call regarding the library to be used and I don't think we should be including the name of the library in the static methods being discussed here. If we need to expose a collection from a different library because it has specific properties (say performance), then we'd use a name that makes _that_ clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1512118780

   Here are some interesting GC results, via `./jmh.sh --prof gc TopicsImageSingleRecord` showing linear GC per operation on the order of megabytes now being constant at under 2k.
   **Old Code**
   ```
   Benchmark                                                   (numReplicasPerBroker)  (partitionsPerTopic)  (replicationFactor)  (totalTopicCount)  Mode  Cnt         Score         Error   Units
   testTopicsDeltaSingleTopicAdd:·gc.alloc.rate.norm                   10000                    10                    3              12500           avgt    5   1193832.055 ±       0.015    B/op
   testTopicsDeltaSingleTopicAdd:·gc.alloc.rate.norm                   10000                    10                    3              25000           avgt    5   2387008.169 ±       0.027    B/op
   testTopicsDeltaSingleTopicAdd:·gc.alloc.rate.norm                   10000                    10                    3              50000           avgt    5   4773480.672 ±       0.570    B/op
   testTopicsDeltaSingleTopicAdd:·gc.alloc.rate.norm                   10000                    10                    3             100000           avgt    5   9546305.345 ±       0.077    B/op
   ```
   
   **New Code**
   ```
   Benchmark                                                   (numReplicasPerBroker)  (partitionsPerTopic)  (replicationFactor)  (totalTopicCount)  Mode  Cnt     Score     Error   Units
   testTopicsDeltaSingleTopicAdd:·gc.alloc.rate.norm                   10000                    10                    3              12500           avgt    5  1744.000 ±   0.001    B/op
   testTopicsDeltaSingleTopicAdd:·gc.alloc.rate.norm                   10000                    10                    3              25000           avgt    5  1864.000 ±   0.001    B/op
   testTopicsDeltaSingleTopicAdd:·gc.alloc.rate.norm                   10000                    10                    3              50000           avgt    5  1984.000 ±   0.001    B/op
   testTopicsDeltaSingleTopicAdd:·gc.alloc.rate.norm                   10000                    10                    3             100000           avgt    5  1984.000 ±   0.001    B/op
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1112773897


##########
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##########
@@ -76,8 +84,8 @@ public TopicImage getTopic(String name) {
     }
 
     public void write(ImageWriter writer, ImageWriterOptions options) {
-        for (TopicImage topicImage : topicsById.values()) {
-            topicImage.write(writer, options);
+        for (Map.Entry<Uuid, TopicImage> entry : topicsById.entrySet()) {
+            entry.getValue().write(writer, options);

Review Comment:
   Why can't we use `topicsById.values()` as before?



##########
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##########
@@ -38,15 +40,21 @@
  */
 public final class TopicsImage {
     public static final TopicsImage EMPTY =
-        new TopicsImage(Collections.emptyMap(), Collections.emptyMap());
+        new TopicsImage(map(), map());
+
+    final ImMap<Uuid, TopicImage> topicsById;
+    final ImMap<String, TopicImage> topicsByName;
 
-    private final Map<Uuid, TopicImage> topicsById;
-    private final Map<String, TopicImage> topicsByName;
+    public TopicsImage(ImMap<Uuid, TopicImage> topicsById,
+                       ImMap<String, TopicImage> topicsByName) {
+        this.topicsById = topicsById;
+        this.topicsByName = topicsByName;
+    }
 
-    public TopicsImage(Map<Uuid, TopicImage> topicsById,
-                       Map<String, TopicImage> topicsByName) {
-        this.topicsById = Collections.unmodifiableMap(topicsById);
-        this.topicsByName = Collections.unmodifiableMap(topicsByName);
+    public TopicsImage including(TopicImage topic) {

Review Comment:
   Looks like this is only used in test. Could this be in `protected` scope?



##########
metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java:
##########
@@ -126,29 +127,27 @@ public void handleMetadataVersionChange(MetadataVersion newVersion) {
     }
 
     public TopicsImage apply() {
-        Map<Uuid, TopicImage> newTopicsById = new HashMap<>(image.topicsById().size());
-        Map<String, TopicImage> newTopicsByName = new HashMap<>(image.topicsByName().size());
-        for (Entry<Uuid, TopicImage> entry : image.topicsById().entrySet()) {
-            Uuid id = entry.getKey();
-            TopicImage prevTopicImage = entry.getValue();
-            TopicDelta delta = changedTopics.get(id);
-            if (delta == null) {
-                if (!deletedTopicIds.contains(id)) {
-                    newTopicsById.put(id, prevTopicImage);
-                    newTopicsByName.put(prevTopicImage.name(), prevTopicImage);
-                }
+        ImMap<Uuid, TopicImage> newTopicsById = image.topicsById;
+        ImMap<String, TopicImage> newTopicsByName = image.topicsByName;
+        // apply all the deletes
+        for (Uuid topicId: deletedTopicIds) {
+            // it was deleted, so we have to remove it from the maps
+            TopicImage originalTopicToBeDeleted = image.topicsById.get(topicId);
+            if (originalTopicToBeDeleted == null) {
+                throw new IllegalStateException("Missing topic id " + topicId);
             } else {
-                TopicImage newTopicImage = delta.apply();
-                newTopicsById.put(id, newTopicImage);
-                newTopicsByName.put(delta.name(), newTopicImage);
+                newTopicsById = newTopicsById.without(topicId);
+                newTopicsByName = newTopicsByName.without(originalTopicToBeDeleted.name());
             }
         }
-        for (Entry<Uuid, TopicDelta> entry : changedTopics.entrySet()) {
-            if (!newTopicsById.containsKey(entry.getKey())) {
-                TopicImage newTopicImage = entry.getValue().apply();
-                newTopicsById.put(newTopicImage.id(), newTopicImage);
-                newTopicsByName.put(newTopicImage.name(), newTopicImage);
-            }
+        // apply all the updates/additions
+        for (Map.Entry<Uuid, TopicDelta> entry: changedTopics.entrySet()) {
+            Uuid topicId = entry.getKey();
+            TopicImage newTopicToBeAddedOrUpdated = entry.getValue().apply();
+            // put new information into the maps
+            String topicName = newTopicToBeAddedOrUpdated.name();
+            newTopicsById = newTopicsById.assoc(topicId, newTopicToBeAddedOrUpdated);
+            newTopicsByName = newTopicsByName.assoc(topicName, newTopicToBeAddedOrUpdated);

Review Comment:
   From the [javadoc](https://javadoc.io/doc/org.organicdesign/Paguro/latest/org/organicdesign/fp/collections/ImMap.html#assoc(K,V)), I'm not sure if the `assoc` method will do update with the same key, not adding a new one. Could you confirm that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

Posted by "rondagostino (via GitHub)" <gi...@apache.org>.
rondagostino commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1440262838

   @showuon Thanks for the review.  Yes, I noticed the lack of Java 8 support as well.  I tried compiling the latest version of Paguro with Java 8 and it did not work (e.g. `[ERROR]   reason: '<>' with anonymous inner classes is not supported in -source 8`).
   
   
   I will change the PRs to use the Java-8 compatible Paguro 3.1.2 version instead of Paguro 3.10.3 for the moment while doing research for a replacement library that supports Java 8 and that extends the standard `java.util` interfaces.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org