You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/25 22:45:54 UTC
kafka git commit: KAFKA-3619: File handles are leaked on .lock files
of ProcessorStateManager
Repository: kafka
Updated Branches:
refs/heads/trunk 1b764c5e8 -> 996e29cfe
KAFKA-3619: File handles are leaked on .lock files of ProcessorStateManager
Kafka Streams seems to hold file handles on the `.lock` files for the state dirs, resulting in an explosion of filehandles over time. Running `lsof` shows the number of open filehandles on the `.lock` file increasing rapidly over time. In a separate test project, I reproduced the issue and determined that in order for the filehandle to be relinquished the `FileChannel` instance must be properly closed. Applying this patch seems to resolve the issue in my job.
Author: Greg Fodor <gf...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #1267 from gfodor/bug/state-lock-filehandle-leak
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/996e29cf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/996e29cf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/996e29cf
Branch: refs/heads/trunk
Commit: 996e29cfe8a9e5a45d4b778a84fb20479eeba469
Parents: 1b764c5
Author: Greg Fodor <gf...@gmail.com>
Authored: Mon Apr 25 13:45:51 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Apr 25 13:45:51 2016 -0700
----------------------------------------------------------------------
.../streams/processor/internals/ProcessorStateManager.java | 4 ++++
.../apache/kafka/streams/processor/internals/StreamThread.java | 1 +
.../streams/processor/internals/ProcessorStateManagerTest.java | 5 ++++-
3 files changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/996e29cf/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 003b988..0cdf44c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -134,6 +134,9 @@ public class ProcessorStateManager {
retry--;
lock = lockStateDirectory(channel);
}
+ if (lock == null) {
+ channel.close();
+ }
return lock;
}
@@ -368,6 +371,7 @@ public class ProcessorStateManager {
} finally {
// release the state directory directoryLock
directoryLock.release();
+ directoryLock.channel().close();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/996e29cf/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f02683e..eff90e8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -519,6 +519,7 @@ public class StreamThread extends Thread {
if (directoryLock != null) {
try {
directoryLock.release();
+ directoryLock.channel().close();
} catch (IOException e) {
log.error("Failed to release the state directory lock");
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/996e29cf/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 84b59e6..e3669e8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -213,7 +213,10 @@ public class ProcessorStateManagerTest {
try {
assertNotNull(lock);
} finally {
- if (lock != null) lock.release();
+ if (lock != null) {
+ lock.release();
+ lock.channel().close();
+ }
}
} finally {
Utils.delete(baseDir);