You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:21 UTC

[07/50] [abbrv] kafka git commit: KAFKA-3619: File handles are leaked on .lock files of ProcessorStateManager

KAFKA-3619: File handles are leaked on .lock files of ProcessorStateManager

Kafka Streams seems to hold file handles on the `.lock` files for the state dirs, resulting in an explosion of filehandles over time. Running `lsof` shows the number of open filehandles on the `.lock` file increasing rapidly over time. In a separate test project, I reproduced the issue and determined that in order for the filehandle to be relinquished the `FileChannel` instance must be properly closed. Applying this patch seems to resolve the issue in my job.

Author: Greg Fodor <gf...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1267 from gfodor/bug/state-lock-filehandle-leak


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/996e29cf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/996e29cf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/996e29cf

Branch: refs/heads/0.10.0
Commit: 996e29cfe8a9e5a45d4b778a84fb20479eeba469
Parents: 1b764c5
Author: Greg Fodor <gf...@gmail.com>
Authored: Mon Apr 25 13:45:51 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Apr 25 13:45:51 2016 -0700

----------------------------------------------------------------------
 .../streams/processor/internals/ProcessorStateManager.java      | 4 ++++
 .../apache/kafka/streams/processor/internals/StreamThread.java  | 1 +
 .../streams/processor/internals/ProcessorStateManagerTest.java  | 5 ++++-
 3 files changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/996e29cf/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 003b988..0cdf44c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -134,6 +134,9 @@ public class ProcessorStateManager {
             retry--;
             lock = lockStateDirectory(channel);
         }
+        if (lock == null) {
+            channel.close();
+        }
         return lock;
     }
 
@@ -368,6 +371,7 @@ public class ProcessorStateManager {
         } finally {
             // release the state directory directoryLock
             directoryLock.release();
+            directoryLock.channel().close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/996e29cf/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f02683e..eff90e8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -519,6 +519,7 @@ public class StreamThread extends Thread {
                                 if (directoryLock != null) {
                                     try {
                                         directoryLock.release();
+                                        directoryLock.channel().close();
                                     } catch (IOException e) {
                                         log.error("Failed to release the state directory lock");
                                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/996e29cf/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 84b59e6..e3669e8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -213,7 +213,10 @@ public class ProcessorStateManagerTest {
             try {
                 assertNotNull(lock);
             } finally {
-                if (lock != null) lock.release();
+                if (lock != null) {
+                    lock.release();
+                    lock.channel().close();
+                }
             }
         } finally {
             Utils.delete(baseDir);