You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/10 10:10:10 UTC

[GitHub] [kafka] divijvaidya opened a new pull request, #12281: KAFKA-13971: Fix atomicity violations caused by improper usage of ConcurrentHashMap - part2

divijvaidya opened a new pull request, #12281:
URL: https://github.com/apache/kafka/pull/12281

   ## Problem #1 in DelegatingClassLoader.java
   Atomicity violation in example such as:
   Consider thread T1 reaches like 232 but before executing context switches to thread T2 which also reaches line 232. In such cases, one of the thread will overwrite the value of another. The code fix prevents such situation.
   
   ## Problem #2 in RocksDBMetricsRecordingTrigger.java
   Atomicity violation in example such as:
   Consider thread T1 reaches line 40 but before executing it context switches to thread T2 which also reaches line 40. In a serialized execution order, thread T2 should have thrown the exception but it won't in this case. The code change fixes that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya closed pull request #12281: KAFKA-13971: Fix atomicity violations caused by improper usage of ConcurrentHashMap - part2

Posted by GitBox <gi...@apache.org>.
divijvaidya closed pull request #12281: KAFKA-13971: Fix atomicity violations caused by improper usage of ConcurrentHashMap - part2
URL: https://github.com/apache/kafka/pull/12281


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12281: KAFKA-13971: Fix atomicity violations caused by improper usage of ConcurrentHashMap - part2

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12281:
URL: https://github.com/apache/kafka/pull/12281#discussion_r921322234


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecordingTrigger.java:
##########
@@ -32,12 +32,12 @@ public RocksDBMetricsRecordingTrigger(final Time time) {
 
     public void addMetricsRecorder(final RocksDBMetricsRecorder metricsRecorder) {
         final String metricsRecorderName = metricsRecorderName(metricsRecorder);
-        if (metricsRecordersToTrigger.containsKey(metricsRecorderName)) {
+        final RocksDBMetricsRecorder existingRocksDBMetricsRecorder = metricsRecordersToTrigger.putIfAbsent(metricsRecorderName, metricsRecorder);

Review Comment:
   ~Does this change behavior? Potential concurrency bug aside (which I'm not sure is actually a bug, since it's unclear if we expect this method to be called concurrently), it looks like we're going from failing _before_ overwriting values to now failing _after_ overwriting them. Is there any fallout from that or is it a benign change?~
   
   Never mind, had to refresh my understanding of `Map::putIfAbsent`. This does not change behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12281: KAFKA-13971: Fix atomicity violations caused by improper usage of ConcurrentHashMap - part2

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12281:
URL: https://github.com/apache/kafka/pull/12281#discussion_r921322234


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecordingTrigger.java:
##########
@@ -32,12 +32,12 @@ public RocksDBMetricsRecordingTrigger(final Time time) {
 
     public void addMetricsRecorder(final RocksDBMetricsRecorder metricsRecorder) {
         final String metricsRecorderName = metricsRecorderName(metricsRecorder);
-        if (metricsRecordersToTrigger.containsKey(metricsRecorderName)) {
+        final RocksDBMetricsRecorder existingRocksDBMetricsRecorder = metricsRecordersToTrigger.putIfAbsent(metricsRecorderName, metricsRecorder);

Review Comment:
   Does this change behavior? Potential concurrency bug aside (which I'm not sure is valid, since it's unclear if we expect this method to be called concurrently), it looks like we're going from failing _before_ overwriting values to now failing _after_ overwriting them. Is there any fallout from that or is it a benign change?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecordingTrigger.java:
##########
@@ -32,12 +32,12 @@ public RocksDBMetricsRecordingTrigger(final Time time) {
 
     public void addMetricsRecorder(final RocksDBMetricsRecorder metricsRecorder) {
         final String metricsRecorderName = metricsRecorderName(metricsRecorder);
-        if (metricsRecordersToTrigger.containsKey(metricsRecorderName)) {
+        final RocksDBMetricsRecorder existingRocksDBMetricsRecorder = metricsRecordersToTrigger.putIfAbsent(metricsRecorderName, metricsRecorder);

Review Comment:
   Does this change behavior? Potential concurrency bug aside (which I'm not sure is actually a bug, since it's unclear if we expect this method to be called concurrently), it looks like we're going from failing _before_ overwriting values to now failing _after_ overwriting them. Is there any fallout from that or is it a benign change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #12281: KAFKA-13971: Fix atomicity violations caused by improper usage of ConcurrentHashMap - part2

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on PR #12281:
URL: https://github.com/apache/kafka/pull/12281#issuecomment-1183040254

   @C0urante please review when you get a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12281: KAFKA-13971: Fix atomicity violations caused by improper usage of ConcurrentHashMap - part2

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on code in PR #12281:
URL: https://github.com/apache/kafka/pull/12281#discussion_r937504331


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecordingTrigger.java:
##########
@@ -32,12 +32,12 @@ public RocksDBMetricsRecordingTrigger(final Time time) {
 
     public void addMetricsRecorder(final RocksDBMetricsRecorder metricsRecorder) {
         final String metricsRecorderName = metricsRecorderName(metricsRecorder);
-        if (metricsRecordersToTrigger.containsKey(metricsRecorderName)) {
+        final RocksDBMetricsRecorder existingRocksDBMetricsRecorder = metricsRecordersToTrigger.putIfAbsent(metricsRecorderName, metricsRecorder);

Review Comment:
   Thanks for your code review folks. On deeper analysis, it doesn't look like that this code will be invoked by multiple threads. I will discard this PR now and I thank you again for looking into this PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #12281: KAFKA-13971: Fix atomicity violations caused by improper usage of ConcurrentHashMap - part2

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on PR #12281:
URL: https://github.com/apache/kafka/pull/12281#issuecomment-1152985895

   > Is the relevant code specified as thread safe?
   
   Thank you for your review @ijuma. I appreciate it. Though, I am afraid I don't understand your question. 
   
   Are you asking whether the existing code is supposed to be thread safe? 
   If yes, for `DelegatingClassLoader.java` the javadoc for the class mentioned that it is supposed to be thread safe (but it isn't due to the bug that is fixed in this review). For the `RocksDBMetricsRecordingTrigger.java`, we run a thread periodically from a metric trigger thread pool which reads from the map maintained in the class. At the same time it is possible that another thread is mutating the map during startup/shutdown of rocksDB which may leave the map in inconsistent state. Hence, it's important for this class to be thread safe as well. 
   Also, note that both the classes in this review use ConcurrentHashMap (albeit incorrectly) to ensure thread safe mutation over the map.
   
   Are you asking whether the changed code is thread safe?
   If yes, the change uses atomic operations provided by ConcurrentHashMap to ensure thread safety.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #12281: KAFKA-13971: Fix atomicity violations caused by improper usage of ConcurrentHashMap - part2

Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #12281:
URL: https://github.com/apache/kafka/pull/12281#issuecomment-1152947936

   Is the relevant code specified as thread safe?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12281: KAFKA-13971: Fix atomicity violations caused by improper usage of ConcurrentHashMap - part2

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12281:
URL: https://github.com/apache/kafka/pull/12281#discussion_r896319034


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -222,14 +222,10 @@ protected PluginClassLoader newPluginClassLoader(
     private <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
         for (PluginDesc<T> plugin : plugins) {
             String pluginClassName = plugin.className();
-            SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(pluginClassName);
-            if (inner == null) {
-                inner = new TreeMap<>();
-                pluginLoaders.put(pluginClassName, inner);
-                // TODO: once versioning is enabled this line should be moved outside this if branch
+            pluginLoaders.computeIfAbsent(pluginClassName, k -> {

Review Comment:
   Is this actually done concurrently? I was under the impression that this logic was all handled on a single thread.
   
   The map may be a `ConcurrentMap` in order to ensure that we can have one writer and an arbitrary number of readers at a single time, which might come in handy if a class has static initialization logic that spawns other threads which in turn make implicit calls to `DelegatingClassLoader::loadClass`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12281: KAFKA-13971: Fix atomicity violations caused by improper usage of ConcurrentHashMap - part2

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12281:
URL: https://github.com/apache/kafka/pull/12281#discussion_r921456034


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecordingTrigger.java:
##########
@@ -32,12 +32,12 @@ public RocksDBMetricsRecordingTrigger(final Time time) {
 
     public void addMetricsRecorder(final RocksDBMetricsRecorder metricsRecorder) {
         final String metricsRecorderName = metricsRecorderName(metricsRecorder);
-        if (metricsRecordersToTrigger.containsKey(metricsRecorderName)) {
+        final RocksDBMetricsRecorder existingRocksDBMetricsRecorder = metricsRecordersToTrigger.putIfAbsent(metricsRecorderName, metricsRecorder);

Review Comment:
   This is a nice improvement in readability, but are we certain it's necessary? I commented on the change to the plugin scanning logic in Connect because I'm familiar with that part of the code base; I don't have the same familiarity with Streams, though.
   
   I think it's fine to merge this change, but if this method isn't intended to be invoked concurrently, we should modify the PR title so that the commit message doesn't imply this is a bug fix and instead recognizes it as a cosmetic improvement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mdedetrich commented on a diff in pull request #12281: KAFKA-13971: Fix atomicity violations caused by improper usage of ConcurrentHashMap - part2

Posted by GitBox <gi...@apache.org>.
mdedetrich commented on code in PR #12281:
URL: https://github.com/apache/kafka/pull/12281#discussion_r937115822


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecordingTrigger.java:
##########
@@ -32,12 +32,12 @@ public RocksDBMetricsRecordingTrigger(final Time time) {
 
     public void addMetricsRecorder(final RocksDBMetricsRecorder metricsRecorder) {
         final String metricsRecorderName = metricsRecorderName(metricsRecorder);
-        if (metricsRecordersToTrigger.containsKey(metricsRecorderName)) {
+        final RocksDBMetricsRecorder existingRocksDBMetricsRecorder = metricsRecordersToTrigger.putIfAbsent(metricsRecorderName, metricsRecorder);

Review Comment:
   I agree with @C0urante here, if the method isn't meant to be invoked in a concurrent situation then we should either revert to the original `containsKey` OR add a comment along the lines of "Currently this code isn't called in a concurrent situation"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12281: KAFKA-13971: Fix atomicity violations caused by improper usage of ConcurrentHashMap - part2

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on code in PR #12281:
URL: https://github.com/apache/kafka/pull/12281#discussion_r903755896


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -222,14 +222,10 @@ protected PluginClassLoader newPluginClassLoader(
     private <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
         for (PluginDesc<T> plugin : plugins) {
             String pluginClassName = plugin.className();
-            SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(pluginClassName);
-            if (inner == null) {
-                inner = new TreeMap<>();
-                pluginLoaders.put(pluginClassName, inner);
-                // TODO: once versioning is enabled this line should be moved outside this if branch
+            pluginLoaders.computeIfAbsent(pluginClassName, k -> {

Review Comment:
   ok, that is fair, I will revert this change from this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org