You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/03/30 19:15:14 UTC

[kafka] branch trunk updated: KAFKA-12509 Tighten up StateDirectory thread locking (#10418)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 617ee00  KAFKA-12509 Tighten up StateDirectory thread locking (#10418)
617ee00 is described below

commit 617ee003223873da2c1c7f383d3416684c2d84c8
Author: ketulgupta1995 <ke...@gmail.com>
AuthorDate: Wed Mar 31 00:43:53 2021 +0530

    KAFKA-12509 Tighten up StateDirectory thread locking (#10418)
    
    Modified LockAndOwner class to have Thread reference instead of just name
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
 .../kafka/streams/processor/internals/StateDirectory.java     | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 01f62e7..1b745a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -96,9 +96,8 @@ public class StateDirectory {
 
     private static class LockAndOwner {
         final FileLock lock;
-        final String owningThread;
-
-        LockAndOwner(final String owningThread, final FileLock lock) {
+        final Thread owningThread;
+        LockAndOwner(final Thread owningThread, final FileLock lock) {
             this.owningThread = owningThread;
             this.lock = lock;
         }
@@ -298,7 +297,7 @@ public class StateDirectory {
         final File lockFile;
         // we already have the lock so bail out here
         final LockAndOwner lockAndOwner = locks.get(taskId);
-        if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
+        if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread())) {
             log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId);
             return true;
         } else if (lockAndOwner != null) {
@@ -327,7 +326,7 @@ public class StateDirectory {
 
         final FileLock lock = tryLock(channel);
         if (lock != null) {
-            locks.put(taskId, new LockAndOwner(Thread.currentThread().getName(), lock));
+            locks.put(taskId, new LockAndOwner(Thread.currentThread(), lock));
 
             log.debug("{} Acquired state dir lock for task {}", logPrefix(), taskId);
         }
@@ -384,7 +383,7 @@ public class StateDirectory {
      */
     synchronized void unlock(final TaskId taskId) throws IOException {
         final LockAndOwner lockAndOwner = locks.get(taskId);
-        if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
+        if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread())) {
             locks.remove(taskId);
             lockAndOwner.lock.release();
             log.debug("{} Released state dir lock for task {}", logPrefix(), taskId);