You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/10/20 17:07:17 UTC

[kafka] branch 2.4 updated: KAFKA-10426: Deadlock in DistributedHerder during session key update. (#9431)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new de71460  KAFKA-10426: Deadlock in DistributedHerder during session key update. (#9431)
de71460 is described below

commit de71460249a464ba6a257e958ee097d795879dda
Author: xakassi <xa...@mail.ru>
AuthorDate: Tue Oct 20 20:05:30 2020 +0400

    KAFKA-10426: Deadlock in DistributedHerder during session key update. (#9431)
    
    DistributedHerder goes to updateConfigsWithIncrementalCooperative() synchronized method and called configBackingStore.snapshot() which take a lock on internal object in KafkaConfigBackingStore class.
    
    Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized block on internal object gets SESSION_KEY record and calls updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder.
    
    This results to a deadlock.
    
    To avoid this, updateListener with new session key should be called outside synchronized block as it's done, for example, for updateListener.onTaskConfigUpdate(updatedTasks).
    
    Co-authored-by: Taisiia Goltseva <ta...@netcracker.com>
    
    Reviewers: Konstantine Karantasis <k....@gmail.com>
---
 .../connect/storage/KafkaConfigBackingStore.java   | 60 +++++++++++-----------
 1 file changed, 29 insertions(+), 31 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 4c6429c..e279ed0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -680,44 +680,42 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
                 if (started)
                     updateListener.onTaskConfigUpdate(updatedTasks);
             } else if (record.key().equals(SESSION_KEY_KEY)) {
-                synchronized (lock) {
-                    if (value.value() == null) {
-                        log.error("Ignoring session key because it is unexpectedly null");
-                        return;
-                    }
-                    if (!(value.value() instanceof Map)) {
-                        log.error("Ignoring session key because the value is not a Map but is {}", value.value().getClass());
-                        return;
-                    }
+                if (value.value() == null) {
+                    log.error("Ignoring session key because it is unexpectedly null");
+                    return;
+                }
+                if (!(value.value() instanceof Map)) {
+                    log.error("Ignoring session key because the value is not a Map but is {}", value.value().getClass());
+                    return;
+                }
 
-                    Map<String, Object> valueAsMap = (Map<String, Object>) value.value();
+                Map<String, Object> valueAsMap = (Map<String, Object>) value.value();
 
-                    Object sessionKey = valueAsMap.get("key");
-                    if (!(sessionKey instanceof String)) {
-                        log.error("Invalid data for session key 'key' field should be a String but is {}", sessionKey.getClass());
-                        return;
-                    }
-                    byte[] key = Base64.getDecoder().decode((String) sessionKey);
+                Object sessionKey = valueAsMap.get("key");
+                if (!(sessionKey instanceof String)) {
+                    log.error("Invalid data for session key 'key' field should be a String but is {}", sessionKey.getClass());
+                    return;
+                }
+                byte[] key = Base64.getDecoder().decode((String) sessionKey);
 
-                    Object keyAlgorithm = valueAsMap.get("algorithm");
-                    if (!(keyAlgorithm instanceof String)) {
-                        log.error("Invalid data for session key 'algorithm' field should be a String but it is {}", keyAlgorithm.getClass());
-                        return;
-                    }
+                Object keyAlgorithm = valueAsMap.get("algorithm");
+                if (!(keyAlgorithm instanceof String)) {
+                    log.error("Invalid data for session key 'algorithm' field should be a String but it is {}", keyAlgorithm.getClass());
+                    return;
+                }
 
-                    Object creationTimestamp = valueAsMap.get("creation-timestamp");
-                    if (!(creationTimestamp instanceof Long)) {
-                        log.error("Invalid data for session key 'creation-timestamp' field should be a long but it is {}", creationTimestamp.getClass());
-                        return;
-                    }
-                    KafkaConfigBackingStore.this.sessionKey = new SessionKey(
+                Object creationTimestamp = valueAsMap.get("creation-timestamp");
+                if (!(creationTimestamp instanceof Long)) {
+                    log.error("Invalid data for session key 'creation-timestamp' field should be a long but it is {}", creationTimestamp.getClass());
+                    return;
+                }
+                KafkaConfigBackingStore.this.sessionKey = new SessionKey(
                         new SecretKeySpec(key, (String) keyAlgorithm),
                         (long) creationTimestamp
-                    );
+                );
 
-                    if (started)
-                        updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);
-                }
+                if (started)
+                    updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);
             } else {
                 log.error("Discarding config update record with invalid key: {}", record.key());
             }