You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2020/03/16 11:47:12 UTC

[flink] branch release-1.9 updated: [FLINK-16573] Ensure Kinesis RecordFetcher threads shutdown on cancel

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new a9bbcc6  [FLINK-16573] Ensure Kinesis RecordFetcher threads shutdown on cancel
a9bbcc6 is described below

commit a9bbcc69e5bc3ca9a53a802064d96fbbef4f5b81
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Thu Mar 12 13:23:01 2020 +0100

    [FLINK-16573] Ensure Kinesis RecordFetcher threads shutdown on cancel
    
    The threads may not shut down correctly because they do not check for the
    running flag in the inner loops. The threads also do not get interrupted because
    they are not connected to the main task thread.
    
    These threads keep lingering around after the job has shut down:
    
    ```
    Thread 23168: (state = BLOCKED)
     - java.lang.Object.wait(long) @bci=0 (Compiled frame; information may be imprecise)
     - org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter.emitRecords() @bci=140, line=209 (Compiled frame)
     - org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter.run() @bci=18, line=177 (Interpreted frame)
     - java.lang.Thread.run() @bci=11, line=748 (Compiled frame)
    ```
---
 .../flink/streaming/connectors/kinesis/util/RecordEmitter.java    | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
index 95c3688..56182fc 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
@@ -198,6 +198,10 @@ public abstract class RecordEmitter<T extends TimestampedValue> implements Runna
 						}
 					}
 				}
+				if (!running) {
+					// Make sure we can exit this loop so the thread can shut down
+					break runLoop;
+				}
 			}
 
 			// wait until ready to emit min or another queue receives elements
@@ -216,6 +220,10 @@ public abstract class RecordEmitter<T extends TimestampedValue> implements Runna
 						continue runLoop;
 					}
 				}
+				if (!running) {
+					// Make sure we can exit this loop so the thread can shut down
+					break runLoop;
+				}
 			}
 
 			// emit up to queue capacity records