You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/05/14 10:51:42 UTC
[flink] branch master updated: [FLINK-12507][table-runtime-blink]
Fix AsyncLookupJoin doesn't close all generated ResultFutures
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a42b65c [FLINK-12507][table-runtime-blink] Fix AsyncLookupJoin doesn't close all generated ResultFutures
a42b65c is described below
commit a42b65c1ea8850ebc16dcd8d9913651e0837cb36
Author: Jark Wu <im...@gmail.com>
AuthorDate: Tue May 14 13:26:09 2019 +0800
[FLINK-12507][table-runtime-blink] Fix AsyncLookupJoin doesn't close all generated ResultFutures
This closes #8436
---
.../table/runtime/join/lookup/AsyncLookupJoinRunner.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/lookup/AsyncLookupJoinRunner.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/lookup/AsyncLookupJoinRunner.java
index 4a15671..508e1cd 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/lookup/AsyncLookupJoinRunner.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/lookup/AsyncLookupJoinRunner.java
@@ -65,6 +65,13 @@ public class AsyncLookupJoinRunner extends RichAsyncFunction<BaseRow, BaseRow> {
* We use {@link BlockingQueue} to make sure the head {@link ResultFuture}s are available.
*/
private transient BlockingQueue<JoinedRowResultFuture> resultFutureBuffer;
+ /**
+ * A Collection contains all ResultFutures in the runner which is used to invoke
+ * {@code close()} on every ResultFuture. {@link #resultFutureBuffer} may not
+ * contain all the ResultFutures because ResultFutures will be polled from the buffer when
+ * processing.
+ */
+ private transient List<JoinedRowResultFuture> allResultFutures;
public AsyncLookupJoinRunner(
GeneratedFunction<AsyncFunction<BaseRow, Object>> generatedFetcher,
@@ -105,6 +112,7 @@ public class AsyncLookupJoinRunner extends RichAsyncFunction<BaseRow, BaseRow> {
// asyncBufferCapacity + 1 as the queue size in order to avoid
// blocking on the queue when taking a collector.
this.resultFutureBuffer = new ArrayBlockingQueue<>(asyncBufferCapacity + 1);
+ this.allResultFutures = new ArrayList<>();
for (int i = 0; i < asyncBufferCapacity + 1; i++) {
JoinedRowResultFuture rf = new JoinedRowResultFuture(
resultFutureBuffer,
@@ -114,6 +122,7 @@ public class AsyncLookupJoinRunner extends RichAsyncFunction<BaseRow, BaseRow> {
rightRowTypeInfo.getArity());
// add will throw exception immediately if the queue is full which should never happen
resultFutureBuffer.add(rf);
+ allResultFutures.add(rf);
}
}
@@ -141,7 +150,7 @@ public class AsyncLookupJoinRunner extends RichAsyncFunction<BaseRow, BaseRow> {
if (fetcher != null) {
FunctionUtils.closeFunction(fetcher);
}
- for (JoinedRowResultFuture rf : resultFutureBuffer) {
+ for (JoinedRowResultFuture rf : allResultFutures) {
rf.close();
}
}