You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2022/02/16 06:32:30 UTC
[drill] branch master updated: DRILL-8137: Prevent reading union inputs after cancellation request (#2462)
This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 1b65e93 DRILL-8137: Prevent reading union inputs after cancellation request (#2462)
1b65e93 is described below
commit 1b65e9338f27cd8a429bb847e3562e9b4cbd5131
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Wed Feb 16 08:32:16 2022 +0200
DRILL-8137: Prevent reading union inputs after cancellation request (#2462)
---
.../exec/physical/impl/union/UnionAllRecordBatch.java | 10 +++++++++-
.../src/test/java/org/apache/drill/TestUnionAll.java | 19 +++++++++++++++++++
2 files changed, 28 insertions(+), 1 deletion(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index df2ffed..c8b9879 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.physical.impl.union;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
@@ -71,7 +72,7 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
private final List<TransferPair> transfers = new ArrayList<>();
private final List<ValueVector> allocationVectors = new ArrayList<>();
private int recordCount;
- private UnionInputIterator unionInputIterator;
+ private Iterator<Pair<IterOutcome, BatchStatusWrappper>> unionInputIterator;
public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children, FragmentContext context) throws OutOfMemoryException {
super(config, context, true, children.get(0), children.get(1));
@@ -428,6 +429,13 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
}
@Override
+ protected void cancelIncoming() {
+ super.cancelIncoming();
+ // prevent iterating union inputs after the cancellation request
+ unionInputIterator = Collections.emptyIterator();
+ }
+
+ @Override
public void close() {
super.close();
updateBatchMemoryManagerStats();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index 292602f..5a00473 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -1323,4 +1323,23 @@ public class TestUnionAll extends BaseTestQuery {
.run();
}
+ @Test // DRILL-8137
+ public void testUnionCancellation() throws Exception {
+ String query = "WITH foo AS\n" +
+ " (SELECT 1 AS a FROM cp.`/tpch/nation.parquet`\n" +
+ " UNION ALL\n" +
+ " SELECT 1 AS a FROM cp.`/tpch/nation.parquet`\n" +
+ " WHERE n_nationkey > (SELECT 1) )\n" +
+ "SELECT * FROM foo\n" +
+ "LIMIT 1";
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("a")
+ .baselineValues(1)
+ .build()
+ .run();
+ }
+
}