You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2023/02/08 07:44:58 UTC
[drill] branch master updated: DRILL-8372: Unfreed buffers when running a LIMIT 0 query over delimited text (#2728)
This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 03ba19077c DRILL-8372: Unfreed buffers when running a LIMIT 0 query over delimited text (#2728)
03ba19077c is described below
commit 03ba19077c6e9834b5afcbd0217c4e43ef199f7b
Author: James Turton <91...@users.noreply.github.com>
AuthorDate: Wed Feb 8 09:44:52 2023 +0200
DRILL-8372: Unfreed buffers when running a LIMIT 0 query over delimited text (#2728)
---
.../exec/physical/impl/limit/LimitRecordBatch.java | 62 ++++----
.../test/java/org/apache/drill/TestBugFixes.java | 160 +++++++++++++--------
2 files changed, 130 insertions(+), 92 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index f69809664a..5783e8c24d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -47,6 +47,9 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
// Start offset of the records
private int recordStartOffset;
private int numberOfRecords;
+ // We cannot simply rely on BatchState.FIRST in our control logic because
+ // we reset when we encounter an EMIT outcome. Because of this we must keep
+ // our own, resettable, first batch indicator variable.
private boolean first = true;
private final List<TransferPair> transfers = Lists.newArrayList();
@@ -59,38 +62,37 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
@Override
public IterOutcome innerNext() {
- if (!first && !needMoreRecords(numberOfRecords)) {
- outgoingSv.setRecordCount(0);
- incoming.cancel();
- IterOutcome upStream = next(incoming);
-
- while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
- // Clear the memory for the incoming batch
- VectorAccessibleUtilities.clear(incoming);
-
- // clear memory for incoming sv (if any)
- if (incomingSv != null) {
- incomingSv.clear();
- }
- upStream = next(incoming);
- }
- // If EMIT that means leaf operator is UNNEST, in this case refresh the limit states and return EMIT.
- if (upStream == EMIT) {
- // Clear the memory for the incoming batch
- VectorAccessibleUtilities.clear(incoming);
-
- // clear memory for incoming sv (if any)
- if (incomingSv != null) {
- incomingSv.clear();
- }
-
- refreshLimitState();
- return upStream;
+ if (first || needMoreRecords(numberOfRecords)) {
+ // We still have work to do. Pass control to the parent which will obtain
+ // the next batch and call us back in doWork.
+ return super.innerNext();
+ }
+
+ // Our work for downstream is done but we still need to wind up affairs
+ // upstream.
+ outgoingSv.setRecordCount(0);
+ incoming.cancel();
+
+ // Spool through any incoming batches containing data.
+ IterOutcome upStream;
+ do {
+ upStream = next(incoming);
+ // Free memory allocated for the incoming batch.
+ VectorAccessibleUtilities.clear(incoming);
+ if (incomingSv != null) {
+ incomingSv.clear();
}
- // other leaf operator behave as before.
- return NONE;
+ } while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA);
+
+ // Special case: EMIT. This means that the leaf operator is an
+ // UNNEST and we refresh the limit states and return EMIT.
+ if (upStream == EMIT) {
+ refreshLimitState();
+ return upStream;
}
- return super.innerNext();
+
+ // Finally, we return NONE as we must since we cancelled execution.
+ return NONE;
}
@Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
index f0a7959e1d..5f7f5fe18a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
@@ -28,9 +28,11 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.drill.categories.UnlikelyTest;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -38,10 +40,11 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(UnlikelyTest.class)
-public class TestBugFixes extends BaseTestQuery {
+public class TestBugFixes extends ClusterTest {
@BeforeClass
- public static void setupTestFiles() {
+ public static void setUp() throws Exception {
+ startCluster(ClusterFixture.builder(dirTestWatcher));
dirTestWatcher.copyResourceToRoot(Paths.get("bugs", "DRILL-4192"));
}
@@ -51,7 +54,7 @@ public class TestBugFixes extends BaseTestQuery {
" from cp.`tpch/part.parquet` p1, cp.`tpch/part.parquet` p2 \n" +
" where p1.p_name = p2.p_name \n" +
" and p1.p_mfgr = p2.p_mfgr";
- test(select);
+ run(select);
}
@Ignore
@@ -63,38 +66,38 @@ public class TestBugFixes extends BaseTestQuery {
" left outer join cp.`tpch/customer.parquet` c \n" +
" on l.l_orderkey = c.c_custkey) as foo\n" +
" where x < 10000";
- test(select);
+ run(select);
}
@Test
public void testSysDrillbits() throws Exception {
- test("select * from sys.drillbits");
+ run("select * from sys.drillbits");
}
@Test
public void testVersionTable() throws Exception {
- test("select * from sys.version");
+ run("select * from sys.version");
}
@Test
public void DRILL883() throws Exception {
- test("select n1.n_regionkey from cp.`tpch/nation.parquet` n1, (select n_nationkey from cp.`tpch/nation.parquet`) as n2 where n1.n_nationkey = n2.n_nationkey");
+ run("select n1.n_regionkey from cp.`tpch/nation.parquet` n1, (select n_nationkey from cp.`tpch/nation.parquet`) as n2 where n1.n_nationkey = n2.n_nationkey");
}
@Test
public void DRILL1061() throws Exception {
String query = "select foo.mycol.x as COMPLEX_COL from (select convert_from('{ x : [1,2], y : 100 }', 'JSON') as mycol from cp.`tpch/nation.parquet`) as foo(mycol) limit 1";
- test(query);
+ run(query);
}
@Test
public void DRILL1126() throws Exception {
+ client.alterSystem(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
try {
- test(String.format("alter session set `%s` = true", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
String query = "select sum(cast(employee_id as decimal(38, 18))), avg(cast(employee_id as decimal(38, 18))) from cp.`employee.json` group by (department_id)";
- test(query);
+ run(query);
} finally {
- test(String.format("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));
+ client.resetSystem(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
}
}
@@ -106,11 +109,11 @@ public class TestBugFixes extends BaseTestQuery {
*/
@Test
public void Drill3484() throws Exception {
+ client.alterSystem(ExecConstants.CAST_EMPTY_STRING_TO_NULL, true);
try {
- test("alter SYSTEM set `drill.exec.functions.cast_empty_string_to_null` = true;");
- test("select random() from sys.drillbits");
+ run("select random() from sys.drillbits");
} finally {
- test("alter SYSTEM set `drill.exec.functions.cast_empty_string_to_null` = false;");
+ client.resetSystem(ExecConstants.CAST_EMPTY_STRING_TO_NULL);
}
}
@@ -119,8 +122,8 @@ public class TestBugFixes extends BaseTestQuery {
// Drill will hit CanNotPlan, until we add code fix to transform the local LHS filter in left outer join properly.
public void testDRILL1337_LocalLeftFilterLeftOutJoin() throws Exception {
try {
- test("select count(*) from cp.`tpch/nation.parquet` n left outer join " +
- "cp.`tpch/region.parquet` r on n.n_regionkey = r.r_regionkey and n.n_nationkey > 10;");
+ run("select count(*) from cp.`tpch/nation.parquet` n left outer join " +
+ "cp.`tpch/region.parquet` r on n.n_regionkey = r.r_regionkey and n.n_nationkey > 10");
fail();
} catch (UserException e) {
// Expected;
@@ -129,8 +132,8 @@ public class TestBugFixes extends BaseTestQuery {
@Test
public void testDRILL1337_LocalRightFilterLeftOutJoin() throws Exception {
- test("select * from cp.`tpch/nation.parquet` n left outer join " +
- "cp.`tpch/region.parquet` r on n.n_regionkey = r.r_regionkey and r.r_name not like '%ASIA' order by r.r_name;");
+ run("select * from cp.`tpch/nation.parquet` n left outer join " +
+ "cp.`tpch/region.parquet` r on n.n_regionkey = r.r_regionkey and r.r_name not like '%%ASIA' order by r.r_name");
}
@Test
@@ -186,35 +189,52 @@ public class TestBugFixes extends BaseTestQuery {
@Test
public void testDRILL4771() throws Exception {
- final String query = "select count(*) cnt, avg(distinct emp.department_id) avd\n"
- + " from cp.`employee.json` emp";
- final String[] expectedPlans = {
- ".*Agg\\(group=\\[\\{\\}\\], cnt=\\[\\$SUM0\\(\\$1\\)\\], agg#1=\\[\\$SUM0\\(\\$0\\)\\], agg#2=\\[COUNT\\(\\$0\\)\\]\\)",
- ".*Agg\\(group=\\[\\{0\\}\\], cnt=\\[COUNT\\(\\)\\]\\)"};
- final String[] excludedPlans = {".*Join\\(condition=\\[true\\], joinType=\\[inner\\]\\).*"};
- PlanTestBase.testPlanMatchingPatterns(query, expectedPlans, excludedPlans);
- testBuilder()
- .sqlQuery(query)
- .unOrdered()
- .baselineColumns("cnt", "avd")
- .baselineValues(1155L, 10.416666666666666)
- .build().run();
-
- final String query1 = "select emp.gender, count(*) cnt, avg(distinct emp.department_id) avd\n"
- + " from cp.`employee.json` emp\n"
- + " group by gender";
- final String[] expectedPlans1 = {
- ".*Agg\\(group=\\[\\{0\\}\\], cnt=\\[\\$SUM0\\(\\$2\\)\\], agg#1=\\[\\$SUM0\\(\\$1\\)\\], agg#2=\\[COUNT\\(\\$1\\)\\]\\)",
- ".*Agg\\(group=\\[\\{0, 1\\}\\], cnt=\\[COUNT\\(\\)\\]\\)"};
- final String[] excludedPlans1 = {".*Join\\(condition=\\[true\\], joinType=\\[inner\\]\\).*"};
- PlanTestBase.testPlanMatchingPatterns(query1, expectedPlans1, excludedPlans1);
- testBuilder()
- .sqlQuery(query1)
- .unOrdered()
- .baselineColumns("gender", "cnt", "avd")
- .baselineValues("F", 601L, 10.416666666666666)
- .baselineValues("M", 554L, 11.9)
- .build().run();
+ {
+ String query = "select count(*) cnt, avg(distinct emp.department_id) avd\n"
+ + " from cp.`employee.json` emp";
+ String[] expectedPlans = {
+ ".*Agg\\(group=\\[\\{\\}\\], cnt=\\[\\$SUM0\\(\\$1\\)\\], agg#1=\\[\\$SUM0\\(\\$0\\)\\], agg#2=\\[COUNT\\(\\$0\\)\\]\\)",
+ ".*Agg\\(group=\\[\\{0\\}\\], cnt=\\[COUNT\\(\\)\\]\\)"};
+ String[] excludedPlans = {".*Join\\(condition=\\[true\\], joinType=\\[inner\\]\\).*"};
+
+ client.queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(expectedPlans)
+ .exclude(excludedPlans)
+ .match();
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("cnt", "avd")
+ .baselineValues(1155L, 10.416666666666666)
+ .build().run();
+ }
+ {
+ String query = "select emp.gender, count(*) cnt, avg(distinct emp.department_id) avd\n"
+ + " from cp.`employee.json` emp\n"
+ + " group by gender";
+ String[] expectedPlans = {
+ ".*Agg\\(group=\\[\\{0\\}\\], cnt=\\[\\$SUM0\\(\\$2\\)\\], agg#1=\\[\\$SUM0\\(\\$1\\)\\], agg#2=\\[COUNT\\(\\$1\\)\\]\\)",
+ ".*Agg\\(group=\\[\\{0, 1\\}\\], cnt=\\[COUNT\\(\\)\\]\\)"};
+ String[] excludedPlans = {".*Join\\(condition=\\[true\\], joinType=\\[inner\\]\\).*"};
+
+ client.queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include(expectedPlans)
+ .exclude(excludedPlans)
+ .match();
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("gender", "cnt", "avd")
+ .baselineValues("F", 601L, 10.416666666666666)
+ .baselineValues("M", 554L, 11.9)
+ .build().run();
+ }
}
@Test
@@ -250,7 +270,7 @@ public class TestBugFixes extends BaseTestQuery {
query.append("(CAST('1964-03-07' AS DATE)),");
}
query.append("(CAST('1951-05-16' AS DATE))) tbl(dt)");
- test(query.toString());
+ run(query.toString());
}
@Test // DRILL-4971
@@ -286,10 +306,10 @@ public class TestBugFixes extends BaseTestQuery {
@Test
public void testDRILL5269() throws Exception {
+ client.alterSession("planner.enable_nljoin_for_scalar_only", false);
+ client.alterSession(ExecConstants.SLICE_TARGET, 500);
try {
- test("ALTER SESSION SET `planner.enable_nljoin_for_scalar_only` = false");
- test("ALTER SESSION SET `planner.slice_target` = 500");
- test("\nSELECT `one` FROM (\n" +
+ run("\nSELECT `one` FROM (\n" +
" SELECT 1 `one` FROM cp.`tpch/nation.parquet`\n" +
" INNER JOIN (\n" +
" SELECT 2 `two` FROM cp.`tpch/nation.parquet`\n" +
@@ -302,21 +322,28 @@ public class TestBugFixes extends BaseTestQuery {
" SELECT count(1) `a_count` FROM cp.`tpch/nation.parquet`\n" +
") `t5` ON TRUE\n");
} finally {
- resetSessionOption("planner.enable_nljoin_for_scalar_only");
- resetSessionOption("planner.slice_target");
+ client.resetSession("planner.enable_nljoin_for_scalar_only");
+ client.resetSession(ExecConstants.SLICE_TARGET);
}
}
@Test
public void testDRILL6318() throws Exception {
- int rows = testSql("SELECT FLATTEN(data) AS d FROM cp.`jsoninput/bug6318.json`");
- Assert.assertEquals(11, rows);
-
- rows = testSql("SELECT FLATTEN(data) AS d FROM cp.`jsoninput/bug6318.json` LIMIT 3");
- Assert.assertEquals(3, rows);
-
- rows = testSql("SELECT FLATTEN(data) AS d FROM cp.`jsoninput/bug6318.json` LIMIT 3 OFFSET 5");
- Assert.assertEquals(3, rows);
+ {
+ String sql = "SELECT FLATTEN(data) AS d FROM cp.`jsoninput/bug6318.json`";
+ long recordCount = client.queryBuilder().sql(sql).run().recordCount();
+ Assert.assertEquals(11, recordCount);
+ }
+ {
+ String sql = "SELECT FLATTEN(data) AS d FROM cp.`jsoninput/bug6318.json` LIMIT 3";
+ long recordCount = client.queryBuilder().sql(sql).run().recordCount();
+ Assert.assertEquals(3, recordCount);
+ }
+ {
+ String sql = "SELECT FLATTEN(data) AS d FROM cp.`jsoninput/bug6318.json` LIMIT 3 OFFSET 5";
+ long recordCount = client.queryBuilder().sql(sql).run().recordCount();
+ Assert.assertEquals(3, recordCount);
+ }
}
@Test
@@ -333,4 +360,13 @@ public class TestBugFixes extends BaseTestQuery {
.build()
.run();
}
+
+ @Test
+ public void testDRILL8372() throws Exception {
+ // The 1994/ subdirectory is sufficient to exhibit the bug.
+ dirTestWatcher.copyResourceToRoot(Paths.get("multilevel", "csv", "1994"));
+ // Throws "SYSTEM ERROR: IllegalStateException: Allocator[op:0:0:4:EasySubScan]
+ // closed with outstanding buffers" when the bug is present.
+ run("select * from dfs.`multilevel/csv/1994` limit 0");
+ }
}