You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/02/25 21:20:00 UTC

[jira] [Commented] (KAFKA-4455) CommitFailedException during rebalance doesn't release resources in tasks/processors

    [ https://issues.apache.org/jira/browse/KAFKA-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16376245#comment-16376245 ] 

ASF GitHub Bot commented on KAFKA-4455:
---------------------------------------

hachikuji closed pull request #2188: KAFKA-4455; ensure tasks are closed after CommitFailedException
URL: https://github.com/apache/kafka/pull/2188
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a135a158fea..d75922dec77 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -311,9 +311,8 @@ private void shutdownTasksAndState(final boolean rethrowExceptions) {
             activeTasks.keySet(), standbyTasks.keySet());
 
         // Commit first as there may be cached records that have not been flushed yet.
-        commitOffsets(rethrowExceptions);
         // Close all processors in topology order
-        closeAllTasks();
+        commitOffsetsAndCloseAll(rethrowExceptions, false);
         // flush state
         flushAllState(rethrowExceptions);
         // flush out any extra data sent during close
@@ -324,7 +323,6 @@ private void shutdownTasksAndState(final boolean rethrowExceptions) {
         unAssignChangeLogPartitions(rethrowExceptions);
     }
 
-
     /**
      * Similar to shutdownTasksAndState, however does not close the task managers,
      * in the hope that soon the tasks will be assigned again
@@ -335,9 +333,8 @@ private void suspendTasksAndState(final boolean rethrowExceptions) {
             activeTasks.keySet(), standbyTasks.keySet());
 
         // Commit first as there may be cached records that have not been flushed yet.
-        commitOffsets(rethrowExceptions);
         // Close all topology nodes
-        closeAllTasksTopologies();
+        commitOffsetsAndCloseAll(rethrowExceptions, true);
         // flush state
         flushAllState(rethrowExceptions);
         // flush out any extra data sent during close
@@ -349,6 +346,54 @@ private void suspendTasksAndState(final boolean rethrowExceptions) {
 
     }
 
+    /**
+     * Commits offsets and closes all tasks or topologies, but ensures close is called upon {@link CommitFailedException}.
+     * @param rethrowExceptions
+     * @param closeTopology if <code>true</code> method {@link #closeAllTasksTopologies()} will be called. If <code>false</code>, method {@link #closeAllTasks()} will be called.
+     */
+    private void commitOffsetsAndCloseAll(boolean rethrowExceptions, boolean closeTopology) {
+        try {
+            // Commit first as there may be cached records that have not been flushed yet.
+            commitOffsets(rethrowExceptions);
+        } catch (CommitFailedException e) {
+            log.info(String.format("%s Closing all tasks after %s: ", logPrefix, e.getClass().getSimpleName()), e);
+
+            // we must close all tasks because of resources held by Processors. If the method:
+            // org.apache.kafka.streams.processor.Processor#close is not called, we could leave any resources
+            // in Processors to be in an unclean state
+
+            // For example in KAFKA-4455: this resulted in filesystem LOCKs are not released, which caused
+            // org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available
+            // when the next time we tried to open RocksDB state stores
+
+            // Close all topology nodes or
+            // Close all processors in topology order
+            closeAllTasksOrTaskTopologies(closeTopology);
+
+            if (rethrowExceptions) {
+                throw e;
+            }
+
+            return;
+        }
+
+        // Close all topology nodes or
+        // Close all processors in topology order
+        closeAllTasksOrTaskTopologies(closeTopology);
+    }
+
+    /**
+     *
+     * @param closeTopology if <code>true</code> method {@link #closeAllTasksTopologies()} will be called. If <code>false</code>, method {@link #closeAllTasks()} will be called.
+     */
+    private void closeAllTasksOrTaskTopologies(boolean closeTopology) {
+        if (closeTopology) {
+            closeAllTasksTopologies();
+        } else {
+            closeAllTasks();
+        }
+    }
+
     interface AbstractTaskAction {
         void apply(final AbstractTask task);
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> CommitFailedException during rebalance doesn't release resources in tasks/processors
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4455
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4455
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.0
>         Environment: Kafka Streams were running on CentOS - I have observed this - after some time the locks were released even if the jvm/process wasn't restarted, so I guess CentOS has some lock cleaning policy.
>            Reporter: Davor Poldrugo
>            Assignee: Ewen Cheslack-Postava
>            Priority: Major
>              Labels: stacktrace
>             Fix For: 0.10.2.0
>
>         Attachments: RocksDBException_IO-error_stacktrace.txt
>
>
> h2. Problem description
> From time to time a rebalance in Kafka Streams causes the commit to throw CommitFailedException. When this exception is thrown, the tasks and processors are not closed. If some processor contains a state store (RocksDB), the RocksDB is not closed, which leads to not relasead LOCK's on OS level, and when the Kafka Streams app is trying to open tasks and their respective processors and state stores the {{org.rocksdb.RocksDBException: IO error: lock .../LOCK: No locks available}} is thrown. If the the jvm/process is restarted the locks are released.
> h2. Additional info
> I have been running 3 Kafka Streams instances on separate machines with {{num.stream.threads=1}} and each with it's own state directory. Other Kafka Streams apps were running on the same machines but they had separate directories for state stores. In the attached logs you can see {{StreamThread-1895}}, I'm not running 1895 StreamThreads, I have implemented some Kafka Streams restart policy in my {{UncaughtExceptionHandler}} which on some transient exceptions restarts the {{org.apache.kafka.streams.KafkaStreams}} topology, by calling {{org.apache.kafka.streams.KafkaStreams.stop()}} and then {{org.apache.kafka.streams.KafkaStreams.start()}}. This causes the thread names to have bigger numbers.
> h2. Stacktrace
> [^RocksDBException_IO-error_stacktrace.txt] 
> h2. Suggested solution
> To avoid restarting the jvm, modify Kafka Streams to close tasks, which will lead to release of resources - in this case - filesystem LOCK files.
> h2. Possible solution code
> Branch: https://github.com/dpoldrugo/kafka/commits/infobip-fork
> Commit: [BUGFIX: When commit fails during rebalance - release resources|https://github.com/dpoldrugo/kafka/commit/af0d16fc5f8629ab0583c94edf3dbf41158b73f3]
> I have been running this fork in production for 3 days and the error doesn't come-up.
> h2. Note
> This could be related this issues: KAFKA-3708 and KAFKA-3938
> Additinal conversation can be found here: [stream shut down due to no locks for state store|https://groups.google.com/forum/#!topic/confluent-platform/i5cwYhpUtx4]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)