You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/10/20 17:05:21 UTC
[kafka] branch 2.7 updated: KAFKA-10426: Deadlock in
DistributedHerder during session key update. (#9431)
This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 3ee311e KAFKA-10426: Deadlock in DistributedHerder during session key update. (#9431)
3ee311e is described below
commit 3ee311ecf90f124e98970295046336119aa058d6
Author: xakassi <xa...@mail.ru>
AuthorDate: Tue Oct 20 20:05:30 2020 +0400
KAFKA-10426: Deadlock in DistributedHerder during session key update. (#9431)
DistributedHerder goes to updateConfigsWithIncrementalCooperative() synchronized method and called configBackingStore.snapshot() which take a lock on internal object in KafkaConfigBackingStore class.
Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized block on internal object gets SESSION_KEY record and calls updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder.
This results to a deadlock.
To avoid this, updateListener with new session key should be called outside synchronized block as it's done, for example, for updateListener.onTaskConfigUpdate(updatedTasks).
Co-authored-by: Taisiia Goltseva <ta...@netcracker.com>
Reviewers: Konstantine Karantasis <k....@gmail.com>
---
.../connect/storage/KafkaConfigBackingStore.java | 60 +++++++++++-----------
1 file changed, 29 insertions(+), 31 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 29299a8..fbcc35b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -699,44 +699,42 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
if (started)
updateListener.onTaskConfigUpdate(updatedTasks);
} else if (record.key().equals(SESSION_KEY_KEY)) {
- synchronized (lock) {
- if (value.value() == null) {
- log.error("Ignoring session key because it is unexpectedly null");
- return;
- }
- if (!(value.value() instanceof Map)) {
- log.error("Ignoring session key because the value is not a Map but is {}", value.value().getClass());
- return;
- }
+ if (value.value() == null) {
+ log.error("Ignoring session key because it is unexpectedly null");
+ return;
+ }
+ if (!(value.value() instanceof Map)) {
+ log.error("Ignoring session key because the value is not a Map but is {}", value.value().getClass());
+ return;
+ }
- Map<String, Object> valueAsMap = (Map<String, Object>) value.value();
+ Map<String, Object> valueAsMap = (Map<String, Object>) value.value();
- Object sessionKey = valueAsMap.get("key");
- if (!(sessionKey instanceof String)) {
- log.error("Invalid data for session key 'key' field should be a String but is {}", sessionKey.getClass());
- return;
- }
- byte[] key = Base64.getDecoder().decode((String) sessionKey);
+ Object sessionKey = valueAsMap.get("key");
+ if (!(sessionKey instanceof String)) {
+ log.error("Invalid data for session key 'key' field should be a String but is {}", sessionKey.getClass());
+ return;
+ }
+ byte[] key = Base64.getDecoder().decode((String) sessionKey);
- Object keyAlgorithm = valueAsMap.get("algorithm");
- if (!(keyAlgorithm instanceof String)) {
- log.error("Invalid data for session key 'algorithm' field should be a String but it is {}", keyAlgorithm.getClass());
- return;
- }
+ Object keyAlgorithm = valueAsMap.get("algorithm");
+ if (!(keyAlgorithm instanceof String)) {
+ log.error("Invalid data for session key 'algorithm' field should be a String but it is {}", keyAlgorithm.getClass());
+ return;
+ }
- Object creationTimestamp = valueAsMap.get("creation-timestamp");
- if (!(creationTimestamp instanceof Long)) {
- log.error("Invalid data for session key 'creation-timestamp' field should be a long but it is {}", creationTimestamp.getClass());
- return;
- }
- KafkaConfigBackingStore.this.sessionKey = new SessionKey(
+ Object creationTimestamp = valueAsMap.get("creation-timestamp");
+ if (!(creationTimestamp instanceof Long)) {
+ log.error("Invalid data for session key 'creation-timestamp' field should be a long but it is {}", creationTimestamp.getClass());
+ return;
+ }
+ KafkaConfigBackingStore.this.sessionKey = new SessionKey(
new SecretKeySpec(key, (String) keyAlgorithm),
(long) creationTimestamp
- );
+ );
- if (started)
- updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);
- }
+ if (started)
+ updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);
} else {
log.error("Discarding config update record with invalid key: {}", record.key());
}