You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/05/14 10:51:42 UTC

[flink] branch master updated: [FLINK-12507][table-runtime-blink] Fix AsyncLookupJoin doesn't close all generated ResultFutures

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a42b65c  [FLINK-12507][table-runtime-blink] Fix AsyncLookupJoin doesn't close all generated ResultFutures
a42b65c is described below

commit a42b65c1ea8850ebc16dcd8d9913651e0837cb36
Author: Jark Wu <im...@gmail.com>
AuthorDate: Tue May 14 13:26:09 2019 +0800

    [FLINK-12507][table-runtime-blink] Fix AsyncLookupJoin doesn't close all generated ResultFutures
    
    This closes #8436
---
 .../table/runtime/join/lookup/AsyncLookupJoinRunner.java      | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/lookup/AsyncLookupJoinRunner.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/lookup/AsyncLookupJoinRunner.java
index 4a15671..508e1cd 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/lookup/AsyncLookupJoinRunner.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/lookup/AsyncLookupJoinRunner.java
@@ -65,6 +65,13 @@ public class AsyncLookupJoinRunner extends RichAsyncFunction<BaseRow, BaseRow> {
 	 * We use {@link BlockingQueue} to make sure the head {@link ResultFuture}s are available.
 	 */
 	private transient BlockingQueue<JoinedRowResultFuture> resultFutureBuffer;
+	/**
+	 * A Collection contains all ResultFutures in the runner which is used to invoke
+	 * {@code close()} on every ResultFuture. {@link #resultFutureBuffer} may not
+	 * contain all the ResultFutures because ResultFutures will be polled from the buffer when
+	 * processing.
+	 */
+	private transient List<JoinedRowResultFuture> allResultFutures;
 
 	public AsyncLookupJoinRunner(
 			GeneratedFunction<AsyncFunction<BaseRow, Object>> generatedFetcher,
@@ -105,6 +112,7 @@ public class AsyncLookupJoinRunner extends RichAsyncFunction<BaseRow, BaseRow> {
 		// asyncBufferCapacity + 1 as the queue size in order to avoid
 		// blocking on the queue when taking a collector.
 		this.resultFutureBuffer = new ArrayBlockingQueue<>(asyncBufferCapacity + 1);
+		this.allResultFutures = new ArrayList<>();
 		for (int i = 0; i < asyncBufferCapacity + 1; i++) {
 			JoinedRowResultFuture rf = new JoinedRowResultFuture(
 				resultFutureBuffer,
@@ -114,6 +122,7 @@ public class AsyncLookupJoinRunner extends RichAsyncFunction<BaseRow, BaseRow> {
 				rightRowTypeInfo.getArity());
 			// add will throw exception immediately if the queue is full which should never happen
 			resultFutureBuffer.add(rf);
+			allResultFutures.add(rf);
 		}
 	}
 
@@ -141,7 +150,7 @@ public class AsyncLookupJoinRunner extends RichAsyncFunction<BaseRow, BaseRow> {
 		if (fetcher != null) {
 			FunctionUtils.closeFunction(fetcher);
 		}
-		for (JoinedRowResultFuture rf : resultFutureBuffer) {
+		for (JoinedRowResultFuture rf : allResultFutures) {
 			rf.close();
 		}
 	}