You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2022/02/16 06:32:30 UTC

[drill] branch master updated: DRILL-8137: Prevent reading union inputs after cancellation request (#2462)

This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b65e93  DRILL-8137: Prevent reading union inputs after cancellation request (#2462)
1b65e93 is described below

commit 1b65e9338f27cd8a429bb847e3562e9b4cbd5131
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Wed Feb 16 08:32:16 2022 +0200

    DRILL-8137: Prevent reading union inputs after cancellation request (#2462)
---
 .../exec/physical/impl/union/UnionAllRecordBatch.java | 10 +++++++++-
 .../src/test/java/org/apache/drill/TestUnionAll.java  | 19 +++++++++++++++++++
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index df2ffed..c8b9879 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.union;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -71,7 +72,7 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
   private final List<TransferPair> transfers = new ArrayList<>();
   private final List<ValueVector> allocationVectors = new ArrayList<>();
   private int recordCount;
-  private UnionInputIterator unionInputIterator;
+  private Iterator<Pair<IterOutcome, BatchStatusWrappper>> unionInputIterator;
 
   public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children, FragmentContext context) throws OutOfMemoryException {
     super(config, context, true, children.get(0), children.get(1));
@@ -428,6 +429,13 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
   }
 
   @Override
+  protected void cancelIncoming() {
+    super.cancelIncoming();
+    // prevent iterating union inputs after the cancellation request
+    unionInputIterator = Collections.emptyIterator();
+  }
+
+  @Override
   public void close() {
     super.close();
     updateBatchMemoryManagerStats();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index 292602f..5a00473 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -1323,4 +1323,23 @@ public class TestUnionAll extends BaseTestQuery {
         .run();
   }
 
+  @Test // DRILL-8137
+  public void testUnionCancellation() throws Exception {
+    String query = "WITH foo AS\n" +
+      "  (SELECT 1 AS a FROM cp.`/tpch/nation.parquet`\n" +
+      "   UNION ALL\n" +
+      "   SELECT 1 AS a FROM cp.`/tpch/nation.parquet`\n" +
+      "   WHERE n_nationkey > (SELECT 1) )\n" +
+      "SELECT * FROM foo\n" +
+      "LIMIT 1";
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("a")
+      .baselineValues(1)
+      .build()
+      .run();
+  }
+
 }