You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/03/30 19:15:14 UTC
[kafka] branch trunk updated: KAFKA-12509 Tighten up StateDirectory
thread locking (#10418)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 617ee00 KAFKA-12509 Tighten up StateDirectory thread locking (#10418)
617ee00 is described below
commit 617ee003223873da2c1c7f383d3416684c2d84c8
Author: ketulgupta1995 <ke...@gmail.com>
AuthorDate: Wed Mar 31 00:43:53 2021 +0530
KAFKA-12509 Tighten up StateDirectory thread locking (#10418)
Modified LockAndOwner class to have Thread reference instead of just name
Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
.../kafka/streams/processor/internals/StateDirectory.java | 11 +++++------
1 file changed, 5 insertions(+), 6 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 01f62e7..1b745a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -96,9 +96,8 @@ public class StateDirectory {
private static class LockAndOwner {
final FileLock lock;
- final String owningThread;
-
- LockAndOwner(final String owningThread, final FileLock lock) {
+ final Thread owningThread;
+ LockAndOwner(final Thread owningThread, final FileLock lock) {
this.owningThread = owningThread;
this.lock = lock;
}
@@ -298,7 +297,7 @@ public class StateDirectory {
final File lockFile;
// we already have the lock so bail out here
final LockAndOwner lockAndOwner = locks.get(taskId);
- if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
+ if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread())) {
log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId);
return true;
} else if (lockAndOwner != null) {
@@ -327,7 +326,7 @@ public class StateDirectory {
final FileLock lock = tryLock(channel);
if (lock != null) {
- locks.put(taskId, new LockAndOwner(Thread.currentThread().getName(), lock));
+ locks.put(taskId, new LockAndOwner(Thread.currentThread(), lock));
log.debug("{} Acquired state dir lock for task {}", logPrefix(), taskId);
}
@@ -384,7 +383,7 @@ public class StateDirectory {
*/
synchronized void unlock(final TaskId taskId) throws IOException {
final LockAndOwner lockAndOwner = locks.get(taskId);
- if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
+ if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread())) {
locks.remove(taskId);
lockAndOwner.lock.release();
log.debug("{} Released state dir lock for task {}", logPrefix(), taskId);