You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2023/02/08 07:44:58 UTC

[drill] branch master updated: DRILL-8372: Unfreed buffers when running a LIMIT 0 query over delimited text (#2728)

This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 03ba19077c DRILL-8372: Unfreed buffers when running a LIMIT 0 query over delimited text (#2728)
03ba19077c is described below

commit 03ba19077c6e9834b5afcbd0217c4e43ef199f7b
Author: James Turton <91...@users.noreply.github.com>
AuthorDate: Wed Feb 8 09:44:52 2023 +0200

    DRILL-8372: Unfreed buffers when running a LIMIT 0 query over delimited text (#2728)
---
 .../exec/physical/impl/limit/LimitRecordBatch.java |  62 ++++----
 .../test/java/org/apache/drill/TestBugFixes.java   | 160 +++++++++++++--------
 2 files changed, 130 insertions(+), 92 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index f69809664a..5783e8c24d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -47,6 +47,9 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   // Start offset of the records
   private int recordStartOffset;
   private int numberOfRecords;
+  // We cannot simply rely on BatchState.FIRST in our control logic because
+  // we reset when we encounter an EMIT outcome. Because of this we must keep
+  // our own, resettable, first batch indicator variable.
   private boolean first = true;
   private final List<TransferPair> transfers = Lists.newArrayList();
 
@@ -59,38 +62,37 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
 
   @Override
   public IterOutcome innerNext() {
-    if (!first && !needMoreRecords(numberOfRecords)) {
-      outgoingSv.setRecordCount(0);
-      incoming.cancel();
-      IterOutcome upStream = next(incoming);
-
-      while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
-        // Clear the memory for the incoming batch
-        VectorAccessibleUtilities.clear(incoming);
-
-        // clear memory for incoming sv (if any)
-        if (incomingSv != null) {
-          incomingSv.clear();
-        }
-        upStream = next(incoming);
-      }
-      // If EMIT that means leaf operator is UNNEST, in this case refresh the limit states and return EMIT.
-      if (upStream == EMIT) {
-        // Clear the memory for the incoming batch
-        VectorAccessibleUtilities.clear(incoming);
-
-        // clear memory for incoming sv (if any)
-        if (incomingSv != null) {
-          incomingSv.clear();
-        }
-
-        refreshLimitState();
-        return upStream;
+    if (first || needMoreRecords(numberOfRecords)) {
+      // We still have work to do. Pass control to the parent which will obtain
+      // the next batch and call us back in doWork.
+      return super.innerNext();
+    }
+
+    // Our work for downstream is done but we still need to wind up affairs
+    // upstream.
+    outgoingSv.setRecordCount(0);
+    incoming.cancel();
+
+    // Spool through any incoming batches containing data.
+    IterOutcome upStream;
+    do {
+      upStream = next(incoming);
+      // Free memory allocated for the incoming batch.
+      VectorAccessibleUtilities.clear(incoming);
+      if (incomingSv != null) {
+        incomingSv.clear();
       }
-      // other leaf operator behave as before.
-      return NONE;
+    } while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA);
+
+    // Special case: EMIT. This means that the leaf operator is an
+    // UNNEST and we refresh the limit states and return EMIT.
+    if (upStream == EMIT) {
+      refreshLimitState();
+      return upStream;
     }
-    return super.innerNext();
+
+    // Finally, we return NONE as we must since we cancelled execution.
+    return NONE;
   }
 
   @Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
index f0a7959e1d..5f7f5fe18a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
@@ -28,9 +28,11 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.categories.UnlikelyTest;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -38,10 +40,11 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 @Category(UnlikelyTest.class)
-public class TestBugFixes extends BaseTestQuery {
+public class TestBugFixes extends ClusterTest {
 
   @BeforeClass
-  public static void setupTestFiles() {
+  public static void setUp() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
     dirTestWatcher.copyResourceToRoot(Paths.get("bugs", "DRILL-4192"));
   }
 
@@ -51,7 +54,7 @@ public class TestBugFixes extends BaseTestQuery {
         "    from cp.`tpch/part.parquet` p1, cp.`tpch/part.parquet` p2 \n" +
         "    where p1.p_name = p2.p_name \n" +
         "  and p1.p_mfgr = p2.p_mfgr";
-    test(select);
+    run(select);
   }
 
   @Ignore
@@ -63,38 +66,38 @@ public class TestBugFixes extends BaseTestQuery {
         "    left outer join cp.`tpch/customer.parquet` c \n" +
         "      on l.l_orderkey = c.c_custkey) as foo\n" +
         "  where x < 10000";
-    test(select);
+    run(select);
   }
 
   @Test
   public void testSysDrillbits() throws Exception {
-    test("select * from sys.drillbits");
+    run("select * from sys.drillbits");
   }
 
   @Test
   public void testVersionTable() throws Exception {
-    test("select * from sys.version");
+    run("select * from sys.version");
   }
 
   @Test
   public void DRILL883() throws Exception {
-    test("select n1.n_regionkey from cp.`tpch/nation.parquet` n1, (select n_nationkey from cp.`tpch/nation.parquet`) as n2 where n1.n_nationkey = n2.n_nationkey");
+    run("select n1.n_regionkey from cp.`tpch/nation.parquet` n1, (select n_nationkey from cp.`tpch/nation.parquet`) as n2 where n1.n_nationkey = n2.n_nationkey");
   }
 
   @Test
   public void DRILL1061() throws Exception {
     String query = "select foo.mycol.x as COMPLEX_COL from (select convert_from('{ x : [1,2], y : 100 }', 'JSON') as mycol from cp.`tpch/nation.parquet`) as foo(mycol) limit 1";
-    test(query);
+    run(query);
   }
 
   @Test
   public void DRILL1126() throws Exception {
+    client.alterSystem(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
     try {
-      test(String.format("alter session set `%s` = true", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
       String query = "select sum(cast(employee_id as decimal(38, 18))), avg(cast(employee_id as decimal(38, 18))) from cp.`employee.json` group by (department_id)";
-      test(query);
+      run(query);
     } finally {
-      test(String.format("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
+      client.resetSystem(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
     }
   }
 
@@ -106,11 +109,11 @@ public class TestBugFixes extends BaseTestQuery {
    */
   @Test
   public void Drill3484() throws Exception {
+    client.alterSystem(ExecConstants.CAST_EMPTY_STRING_TO_NULL, true);
     try {
-      test("alter SYSTEM set `drill.exec.functions.cast_empty_string_to_null` = true;");
-      test("select random() from sys.drillbits");
+      run("select random() from sys.drillbits");
     } finally {
-      test("alter SYSTEM set `drill.exec.functions.cast_empty_string_to_null` = false;");
+      client.resetSystem(ExecConstants.CAST_EMPTY_STRING_TO_NULL);
     }
   }
 
@@ -119,8 +122,8 @@ public class TestBugFixes extends BaseTestQuery {
   // Drill will hit CanNotPlan, until we add code fix to transform the local LHS filter in left outer join properly.
   public void testDRILL1337_LocalLeftFilterLeftOutJoin() throws Exception {
     try {
-      test("select count(*) from cp.`tpch/nation.parquet` n left outer join " +
-           "cp.`tpch/region.parquet` r on n.n_regionkey = r.r_regionkey and n.n_nationkey > 10;");
+      run("select count(*) from cp.`tpch/nation.parquet` n left outer join " +
+           "cp.`tpch/region.parquet` r on n.n_regionkey = r.r_regionkey and n.n_nationkey > 10");
       fail();
     } catch (UserException e) {
       // Expected;
@@ -129,8 +132,8 @@ public class TestBugFixes extends BaseTestQuery {
 
   @Test
   public void testDRILL1337_LocalRightFilterLeftOutJoin() throws Exception {
-    test("select * from cp.`tpch/nation.parquet` n left outer join " +
-         "cp.`tpch/region.parquet` r on n.n_regionkey = r.r_regionkey and r.r_name not like '%ASIA' order by r.r_name;");
+    run("select * from cp.`tpch/nation.parquet` n left outer join " +
+         "cp.`tpch/region.parquet` r on n.n_regionkey = r.r_regionkey and r.r_name not like '%%ASIA' order by r.r_name");
   }
 
   @Test
@@ -186,35 +189,52 @@ public class TestBugFixes extends BaseTestQuery {
 
   @Test
   public void testDRILL4771() throws Exception {
-    final String query = "select count(*) cnt, avg(distinct emp.department_id) avd\n"
-        + " from cp.`employee.json` emp";
-    final String[] expectedPlans = {
-        ".*Agg\\(group=\\[\\{\\}\\], cnt=\\[\\$SUM0\\(\\$1\\)\\], agg#1=\\[\\$SUM0\\(\\$0\\)\\], agg#2=\\[COUNT\\(\\$0\\)\\]\\)",
-        ".*Agg\\(group=\\[\\{0\\}\\], cnt=\\[COUNT\\(\\)\\]\\)"};
-    final String[] excludedPlans = {".*Join\\(condition=\\[true\\], joinType=\\[inner\\]\\).*"};
-    PlanTestBase.testPlanMatchingPatterns(query, expectedPlans, excludedPlans);
-    testBuilder()
-        .sqlQuery(query)
-        .unOrdered()
-        .baselineColumns("cnt", "avd")
-        .baselineValues(1155L, 10.416666666666666)
-        .build().run();
-
-    final String query1 = "select emp.gender, count(*) cnt, avg(distinct emp.department_id) avd\n"
-            + " from cp.`employee.json` emp\n"
-            + " group by gender";
-    final String[] expectedPlans1 = {
-            ".*Agg\\(group=\\[\\{0\\}\\], cnt=\\[\\$SUM0\\(\\$2\\)\\], agg#1=\\[\\$SUM0\\(\\$1\\)\\], agg#2=\\[COUNT\\(\\$1\\)\\]\\)",
-            ".*Agg\\(group=\\[\\{0, 1\\}\\], cnt=\\[COUNT\\(\\)\\]\\)"};
-    final String[] excludedPlans1 = {".*Join\\(condition=\\[true\\], joinType=\\[inner\\]\\).*"};
-    PlanTestBase.testPlanMatchingPatterns(query1, expectedPlans1, excludedPlans1);
-    testBuilder()
-            .sqlQuery(query1)
-            .unOrdered()
-            .baselineColumns("gender", "cnt", "avd")
-            .baselineValues("F", 601L, 10.416666666666666)
-            .baselineValues("M", 554L, 11.9)
-            .build().run();
+    {
+      String query = "select count(*) cnt, avg(distinct emp.department_id) avd\n"
+          + " from cp.`employee.json` emp";
+      String[] expectedPlans = {
+          ".*Agg\\(group=\\[\\{\\}\\], cnt=\\[\\$SUM0\\(\\$1\\)\\], agg#1=\\[\\$SUM0\\(\\$0\\)\\], agg#2=\\[COUNT\\(\\$0\\)\\]\\)",
+          ".*Agg\\(group=\\[\\{0\\}\\], cnt=\\[COUNT\\(\\)\\]\\)"};
+      String[] excludedPlans = {".*Join\\(condition=\\[true\\], joinType=\\[inner\\]\\).*"};
+
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include(expectedPlans)
+        .exclude(excludedPlans)
+        .match();
+
+      testBuilder()
+          .sqlQuery(query)
+          .unOrdered()
+          .baselineColumns("cnt", "avd")
+          .baselineValues(1155L, 10.416666666666666)
+          .build().run();
+    }
+    {
+      String query = "select emp.gender, count(*) cnt, avg(distinct emp.department_id) avd\n"
+              + " from cp.`employee.json` emp\n"
+              + " group by gender";
+      String[] expectedPlans = {
+              ".*Agg\\(group=\\[\\{0\\}\\], cnt=\\[\\$SUM0\\(\\$2\\)\\], agg#1=\\[\\$SUM0\\(\\$1\\)\\], agg#2=\\[COUNT\\(\\$1\\)\\]\\)",
+              ".*Agg\\(group=\\[\\{0, 1\\}\\], cnt=\\[COUNT\\(\\)\\]\\)"};
+      String[] excludedPlans = {".*Join\\(condition=\\[true\\], joinType=\\[inner\\]\\).*"};
+
+      client.queryBuilder()
+        .sql(query)
+        .planMatcher()
+        .include(expectedPlans)
+        .exclude(excludedPlans)
+        .match();
+
+      testBuilder()
+              .sqlQuery(query)
+              .unOrdered()
+              .baselineColumns("gender", "cnt", "avd")
+              .baselineValues("F", 601L, 10.416666666666666)
+              .baselineValues("M", 554L, 11.9)
+              .build().run();
+    }
   }
 
   @Test
@@ -250,7 +270,7 @@ public class TestBugFixes extends BaseTestQuery {
       query.append("(CAST('1964-03-07' AS DATE)),");
     }
     query.append("(CAST('1951-05-16' AS DATE))) tbl(dt)");
-    test(query.toString());
+    run(query.toString());
   }
 
   @Test // DRILL-4971
@@ -286,10 +306,10 @@ public class TestBugFixes extends BaseTestQuery {
 
   @Test
   public void testDRILL5269() throws Exception {
+    client.alterSession("planner.enable_nljoin_for_scalar_only", false);
+    client.alterSession(ExecConstants.SLICE_TARGET, 500);
     try {
-      test("ALTER SESSION SET `planner.enable_nljoin_for_scalar_only` = false");
-      test("ALTER SESSION SET `planner.slice_target` = 500");
-      test("\nSELECT `one` FROM (\n" +
+      run("\nSELECT `one` FROM (\n" +
           "  SELECT 1 `one` FROM cp.`tpch/nation.parquet`\n" +
           "  INNER JOIN (\n" +
           "    SELECT 2 `two` FROM cp.`tpch/nation.parquet`\n" +
@@ -302,21 +322,28 @@ public class TestBugFixes extends BaseTestQuery {
           "    SELECT count(1) `a_count` FROM cp.`tpch/nation.parquet`\n" +
           ") `t5` ON TRUE\n");
     } finally {
-      resetSessionOption("planner.enable_nljoin_for_scalar_only");
-      resetSessionOption("planner.slice_target");
+      client.resetSession("planner.enable_nljoin_for_scalar_only");
+      client.resetSession(ExecConstants.SLICE_TARGET);
     }
   }
 
   @Test
   public void testDRILL6318() throws Exception {
-    int rows = testSql("SELECT FLATTEN(data) AS d FROM cp.`jsoninput/bug6318.json`");
-    Assert.assertEquals(11, rows);
-
-    rows = testSql("SELECT FLATTEN(data) AS d FROM cp.`jsoninput/bug6318.json` LIMIT 3");
-    Assert.assertEquals(3, rows);
-
-    rows = testSql("SELECT FLATTEN(data) AS d FROM cp.`jsoninput/bug6318.json` LIMIT 3 OFFSET 5");
-    Assert.assertEquals(3, rows);
+    {
+      String sql = "SELECT FLATTEN(data) AS d FROM cp.`jsoninput/bug6318.json`";
+      long recordCount = client.queryBuilder().sql(sql).run().recordCount();
+      Assert.assertEquals(11, recordCount);
+    }
+    {
+      String sql = "SELECT FLATTEN(data) AS d FROM cp.`jsoninput/bug6318.json` LIMIT 3";
+      long recordCount = client.queryBuilder().sql(sql).run().recordCount();
+      Assert.assertEquals(3, recordCount);
+    }
+    {
+      String sql = "SELECT FLATTEN(data) AS d FROM cp.`jsoninput/bug6318.json` LIMIT 3 OFFSET 5";
+      long recordCount = client.queryBuilder().sql(sql).run().recordCount();
+      Assert.assertEquals(3, recordCount);
+    }
   }
 
   @Test
@@ -333,4 +360,13 @@ public class TestBugFixes extends BaseTestQuery {
         .build()
         .run();
   }
+
+  @Test
+  public void testDRILL8372() throws Exception {
+    // The 1994/ subdirectory is sufficient to exhibit the bug.
+    dirTestWatcher.copyResourceToRoot(Paths.get("multilevel", "csv", "1994"));
+    // Throws "SYSTEM ERROR: IllegalStateException: Allocator[op:0:0:4:EasySubScan]
+    // closed with outstanding buffers" when the bug is present.
+    run("select * from dfs.`multilevel/csv/1994` limit 0");
+  }
 }