You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/09 16:41:00 UTC

[GitHub] [kafka] Kvicii opened a new pull request, #12277: ensure thread safe

Kvicii opened a new pull request, #12277:
URL: https://github.com/apache/kafka/pull/12277

   ensure thread safe
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12277: KAFKA-13971:Atomicity violations caused by improper usage of ConcurrentHashMap

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on code in PR #12277:
URL: https://github.com/apache/kafka/pull/12277#discussion_r894298629


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java:
##########
@@ -81,7 +81,7 @@ private void scheduleReload(String connectorName, String path, long ttl) {
         Map<String, HerderRequest> connectorRequests = requests.get(connectorName);
         if (connectorRequests == null) {
             connectorRequests = new ConcurrentHashMap<>();
-            requests.put(connectorName, connectorRequests);
+            requests.putIfAbsent(connectorName, connectorRequests);

Review Comment:
   You should probably update this code as follows:
   ```
   private void scheduleReload(String connectorName, String path, long ttl) {
           final Map<String, HerderRequest> connectorRequests = requests.computeIfAbsent(connectorName, (c) -> new ConcurrentHashMap<>());
           connectorRequests.computeIfPresent(path, (p, previousRequest) -> {
               // Delete previous request for ttl which is now stale
               previousRequest.cancel();
   
               log.info("Scheduling a restart of connector {} in {} ms", connectorName, ttl);
               Callback<Void> cb = (error, result) -> {
                   if (error != null) {
                       log.error("Unexpected error during connector restart: ", error);
                   }
               };
               return worker.herder().restartConnector(ttl, connectorName, cb);
           });
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12277: KAFKA-13971:Atomicity violations caused by improper usage of ConcurrentHashMap

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on code in PR #12277:
URL: https://github.com/apache/kafka/pull/12277#discussion_r894346582


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java:
##########
@@ -81,7 +81,7 @@ private void scheduleReload(String connectorName, String path, long ttl) {
         Map<String, HerderRequest> connectorRequests = requests.get(connectorName);
         if (connectorRequests == null) {
             connectorRequests = new ConcurrentHashMap<>();
-            requests.put(connectorName, connectorRequests);
+            requests.putIfAbsent(connectorName, connectorRequests);

Review Comment:
   You might want to use code similar to the following:
   ```
       private void scheduleReload(String connectorName, String path, long ttl) {
           final Map<String, HerderRequest> connectorRequests = requests.computeIfAbsent(connectorName, (c) -> new ConcurrentHashMap<>());
           connectorRequests.compute(path, (p, previousRequest) -> {
               if (previousRequest != null) {
                   // Delete previous request for ttl which is now stale
                   previousRequest.cancel();
               }
   
               log.info("Scheduling a restart of connector {} in {} ms", connectorName, ttl);
               Callback<Void> cb = (error, result) -> {
                   if (error != null) {
                       log.error("Unexpected error during connector restart: ", error);
                   }
               };
               return worker.herder().restartConnector(ttl, connectorName, cb);
           });
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #12277: KAFKA-13971:Atomicity violations caused by improper usage of ConcurrentHashMap

Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #12277:
URL: https://github.com/apache/kafka/pull/12277#issuecomment-1210721500

   We should check if this code is used in a concurrent context. Will leave it to @C0urante.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12277: KAFKA-13971:Atomicity violations caused by improper usage of ConcurrentHashMap

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on code in PR #12277:
URL: https://github.com/apache/kafka/pull/12277#discussion_r894298629


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java:
##########
@@ -81,7 +81,7 @@ private void scheduleReload(String connectorName, String path, long ttl) {
         Map<String, HerderRequest> connectorRequests = requests.get(connectorName);
         if (connectorRequests == null) {
             connectorRequests = new ConcurrentHashMap<>();
-            requests.put(connectorName, connectorRequests);
+            requests.putIfAbsent(connectorName, connectorRequests);

Review Comment:
   You should probably update this code as follows:
   ```
   private void scheduleReload(String connectorName, String path, long ttl) {
           final Map<String, HerderRequest> connectorRequests = requests.computeIfAbsent(connectorName, (c) -> new ConcurrentHashMap<>());
           connectorRequests.computeIfPresent(path, (p, previousRequest) -> {
               // Delete previous request for ttl which is now stale
               previousRequest.cancel();
   
               log.info("Scheduling a restart of connector {} in {} ms", connectorName, ttl);
               Callback<Void> cb = (error, result) -> {
                   if (error != null) {
                       log.error("Unexpected error during connector restart: ", error);
                   }
               };
               return worker.herder().restartConnector(ttl, connectorName, cb);
           });
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Kvicii commented on pull request #12277: KAFKA-13971:Atomicity violations caused by improper usage of ConcurrentHashMap

Posted by GitBox <gi...@apache.org>.
Kvicii commented on PR #12277:
URL: https://github.com/apache/kafka/pull/12277#issuecomment-1160526400

   @hachikuji can you help me review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12277: KAFKA-13971:Atomicity violations caused by improper usage of ConcurrentHashMap

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on code in PR #12277:
URL: https://github.com/apache/kafka/pull/12277#discussion_r894295650


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java:
##########
@@ -81,7 +81,7 @@ private void scheduleReload(String connectorName, String path, long ttl) {
         Map<String, HerderRequest> connectorRequests = requests.get(connectorName);
         if (connectorRequests == null) {
             connectorRequests = new ConcurrentHashMap<>();
-            requests.put(connectorName, connectorRequests);
+            requests.putIfAbsent(connectorName, connectorRequests);

Review Comment:
   This is still incorrect. Consider the following scenario:
   
   Thread T1 arrives and reaches line 83. Context switch occurs to thread T2.
   T2 arrives and also reaches line 83. Context switch occurs to thread T1.
   T1 executes line 99 and adds an entry. Context switch occurs to thread T2.
   In a world where we want serialized execution, T2 should have gone to the else block, cancelled the previous request and added the new request but now, T2 will simply overwrite the request at line 99 without having cancelled the previous one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #12277: KAFKA-13971:Atomicity violations caused by improper usage of ConcurrentHashMap

Posted by GitBox <gi...@apache.org>.
C0urante merged PR #12277:
URL: https://github.com/apache/kafka/pull/12277


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org