You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/07/04 03:00:10 UTC
[1/3] drill git commit: DRILL-4722: Fix EqualityVisitor for interval
day expressions with millis
Repository: drill
Updated Branches:
refs/heads/master f5b975adf -> 35a1ec667
DRILL-4722: Fix EqualityVisitor for interval day expressions with millis
closes #861
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0b830ef5
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0b830ef5
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0b830ef5
Branch: refs/heads/master
Commit: 0b830ef5370ad865c13da64121cd8a0e366dfd7b
Parents: f5b975a
Author: Volodymyr Vysotskyi <vv...@gmail.com>
Authored: Mon Jun 26 18:12:34 2017 +0000
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Mon Jul 3 18:00:20 2017 -0700
----------------------------------------------------------------------
.../apache/drill/exec/expr/EqualityVisitor.java | 6 +-
.../exec/fn/impl/TestDateAddFunctions.java | 79 ++++++++++++++++++++
2 files changed, 82 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/0b830ef5/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EqualityVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EqualityVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EqualityVisitor.java
index 5f79f32..44ff78c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EqualityVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EqualityVisitor.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -44,7 +44,6 @@ import org.apache.drill.common.expression.ValueExpressions.QuotedString;
import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
-import org.apache.drill.exec.compile.sig.GeneratorMapping;
import java.util.List;
@@ -169,7 +168,8 @@ class EqualityVisitor extends AbstractExprVisitor<Boolean,LogicalExpression,Runt
if (!(value instanceof IntervalDayExpression)) {
return false;
}
- return intExpr.getIntervalDay() == ((IntervalDayExpression) value).getIntervalDay();
+ return intExpr.getIntervalDay() == ((IntervalDayExpression) value).getIntervalDay()
+ && intExpr.getIntervalMillis() == ((IntervalDayExpression) value).getIntervalMillis();
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/0b830ef5/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestDateAddFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestDateAddFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestDateAddFunctions.java
new file mode 100644
index 0000000..7a5a49a
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestDateAddFunctions.java
@@ -0,0 +1,79 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to you under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.drill.exec.fn.impl;
+
+import org.apache.drill.BaseTestQuery;
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+public class TestDateAddFunctions extends BaseTestQuery {
+
+ @Test
+ public void testDateAddIntervalDay() throws Exception {
+ String query = "select date_add(timestamp '2015-01-24 07:27:05.0', interval '3' day) as col1,\n" +
+ "date_add(timestamp '2015-01-24 07:27:05.0', interval '5' day) as col2,\n" +
+ "date_add(timestamp '2015-01-24 07:27:05.0', interval '5' hour) as col3,\n" +
+ "date_add(timestamp '2015-01-24 07:27:05.0', interval '5' minute) as col4,\n" +
+ "date_add(timestamp '2015-01-24 07:27:05.0', interval '5' second) as col5,\n" +
+ "date_add(timestamp '2015-01-24 07:27:05.0', interval '5 10:20:30' day to second) as col6\n" +
+ "from (values(1))";
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("col1", "col2", "col3", "col4", "col5", "col6")
+ .baselineValues(DateTime.parse("2015-01-27T07:27:05.0"),
+ DateTime.parse("2015-01-29T07:27:05.0"),
+ DateTime.parse("2015-01-24T12:27:05.0"),
+ DateTime.parse("2015-01-24T07:32:05.0"),
+ DateTime.parse("2015-01-24T07:27:10.0"),
+ DateTime.parse("2015-01-29T17:47:35.0"))
+ .go();
+ }
+
+ @Test
+ public void testDateAddIntervalYear() throws Exception {
+ String query = "select date_add(date '2015-01-24', interval '3' month) as col1,\n" +
+ "date_add(date '2015-01-24', interval '5' month) as col2,\n" +
+ "date_add(date '2015-01-24', interval '5' year) as col3\n" +
+ "from (values(1))";
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("col1", "col2", "col3")
+ .baselineValues(DateTime.parse("2015-04-24"),
+ DateTime.parse("2015-06-24"),
+ DateTime.parse("2020-01-24"))
+ .go();
+ }
+
+ @Test
+ public void testDateAddIntegerAsDay() throws Exception {
+ String query = "select date_add(date '2015-01-24', 3) as col1,\n" +
+ "date_add(date '2015-01-24', 5) as col2\n" +
+ "from (values(1))";
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("col1", "col2")
+ .baselineValues(DateTime.parse("2015-01-27"),
+ DateTime.parse("2015-01-29"))
+ .go();
+ }
+}
[3/3] drill git commit: DRILL-4970: Prevent changing the negative
value of input holder for cast functions
Posted by pr...@apache.org.
DRILL-4970: Prevent changing the negative value of input holder for cast functions
closes #863
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/35a1ec66
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/35a1ec66
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/35a1ec66
Branch: refs/heads/master
Commit: 35a1ec66726fc5a77ea61cac3b6f3bbc33b8564a
Parents: 0e1e604
Author: Volodymyr Vysotskyi <vv...@gmail.com>
Authored: Thu Jun 29 15:28:05 2017 +0000
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Mon Jul 3 18:03:35 2017 -0700
----------------------------------------------------------------------
.../main/codegen/templates/CastFunctions.java | 19 +-
.../templates/Decimal/CastDecimalInt.java | 14 +-
.../templates/Decimal/CastDecimalVarchar.java | 11 +-
.../drill/exec/fn/impl/TestCastFunctions.java | 439 ++++++++++++++++++-
.../resources/decimal/cast_decimal_float.json | 45 ++
.../resources/decimal/cast_decimal_int.json | 45 ++
.../resources/decimal/cast_int_decimal.json | 45 ++
.../test/resources/input_simple_decimal.json | 24 +-
8 files changed, 612 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/35a1ec66/exec/java-exec/src/main/codegen/templates/CastFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/CastFunctions.java b/exec/java-exec/src/main/codegen/templates/CastFunctions.java
index f68da06..4a1f67f 100644
--- a/exec/java-exec/src/main/codegen/templates/CastFunctions.java
+++ b/exec/java-exec/src/main/codegen/templates/CastFunctions.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -43,8 +43,10 @@ import org.apache.drill.exec.record.RecordBatch;
*/
@SuppressWarnings("unused")
-@FunctionTemplate(name = "cast${type.to?upper_case}", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.NULL_IF_NULL)
-public class Cast${type.from}${type.to} implements DrillSimpleFunc{
+@FunctionTemplate(name = "cast${type.to?upper_case}",
+ scope = FunctionTemplate.FunctionScope.SIMPLE,
+ nulls = NullHandling.NULL_IF_NULL)
+public class Cast${type.from}${type.to} implements DrillSimpleFunc {
@Param ${type.from}Holder in;
@Output ${type.to}Holder out;
@@ -53,18 +55,13 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc{
public void eval() {
<#if (type.from.startsWith("Float") && type.to.endsWith("Int"))>
- boolean sign = (in.value < 0);
- in.value = java.lang.Math.abs(in.value);
${type.native} fractional = in.value % 1;
int digit = ((int) (fractional * 10));
int carry = 0;
- if (digit > 4) {
- carry = 1;
- }
- out.value = ((${type.explicit}) in.value) + carry;
- if (sign == true) {
- out.value *= -1;
+ if (java.lang.Math.abs(digit) > 4) {
+ carry = (int) java.lang.Math.signum(digit);
}
+ out.value = (${type.explicit}) (in.value + carry);
<#elseif type.explicit??>
out.value = (${type.explicit}) in.value;
<#else>
http://git-wip-us.apache.org/repos/asf/drill/blob/35a1ec66/exec/java-exec/src/main/codegen/templates/Decimal/CastDecimalInt.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/Decimal/CastDecimalInt.java b/exec/java-exec/src/main/codegen/templates/Decimal/CastDecimalInt.java
index a13f0e7..37f35f3 100644
--- a/exec/java-exec/src/main/codegen/templates/Decimal/CastDecimalInt.java
+++ b/exec/java-exec/src/main/codegen/templates/Decimal/CastDecimalInt.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -46,7 +46,9 @@ import java.nio.ByteBuffer;
* This class is generated using freemarker and the ${.template_name} template.
*/
@SuppressWarnings("unused")
-@FunctionTemplate(name = "cast${type.to?upper_case}", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.NULL_IF_NULL)
+@FunctionTemplate(name = "cast${type.to?upper_case}",
+ scope = FunctionTemplate.FunctionScope.SIMPLE,
+ nulls = NullHandling.NULL_IF_NULL)
public class Cast${type.from}${type.to} implements DrillSimpleFunc {
@Param ${type.from}Holder in;
@@ -57,12 +59,10 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
public void eval() {
- int carry = (org.apache.drill.exec.util.DecimalUtility.getFirstFractionalDigit(in.value, in.scale) > 4) ? 1 : 0;
+ int carry = (org.apache.drill.exec.util.DecimalUtility.getFirstFractionalDigit(in.value, in.scale) > 4)
+ ? (int) java.lang.Math.signum(in.value) : 0;
// Assign the integer part of the decimal to the output holder
- out.value = java.lang.Math.abs((${type.javatype}) (org.apache.drill.exec.util.DecimalUtility.adjustScaleDivide(in.value, (int) in.scale))) + carry;
- if (in.value < 0) {
- out.value *= -1;
- }
+ out.value = (${type.javatype}) (org.apache.drill.exec.util.DecimalUtility.adjustScaleDivide(in.value, (int) in.scale) + carry);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/35a1ec66/exec/java-exec/src/main/codegen/templates/Decimal/CastDecimalVarchar.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/Decimal/CastDecimalVarchar.java b/exec/java-exec/src/main/codegen/templates/Decimal/CastDecimalVarchar.java
index 239ea28..e4c221e 100644
--- a/exec/java-exec/src/main/codegen/templates/Decimal/CastDecimalVarchar.java
+++ b/exec/java-exec/src/main/codegen/templates/Decimal/CastDecimalVarchar.java
@@ -70,22 +70,23 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
StringBuilder str = new StringBuilder();
- if (in.value < 0) {
+ ${type.javatype} value = in.value;
+ if (value < 0) {
// Negative value, add '-' to the string
str.append("-");
// Negate the number
- in.value *= -1;
+ value *= -1;
}
${type.javatype} separator = (${type.javatype}) org.apache.drill.exec.util.DecimalUtility.getPowerOfTen((int) in.scale);
- str.append(in.value / separator);
+ str.append(value / separator);
if (in.scale > 0) {
str.append(".");
- String fractionalPart = String.valueOf(in.value % separator);
+ String fractionalPart = String.valueOf(value % separator);
/* Since we are taking modulus to find fractional part,
* we will miss printing the leading zeroes in the fractional part
@@ -101,7 +102,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
*
* We missed the initial zeroes in the fractional part. Below logic accounts for this case
*/
- str.append(org.apache.drill.exec.util.DecimalUtility.toStringWithZeroes((in.value % separator), in.scale));
+ str.append(org.apache.drill.exec.util.DecimalUtility.toStringWithZeroes((value % separator), in.scale));
}
out.buffer = buffer;
http://git-wip-us.apache.org/repos/asf/drill/blob/35a1ec66/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestCastFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestCastFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestCastFunctions.java
index 0d50dd3..b3a7fe9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestCastFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestCastFunctions.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,11 +17,17 @@
*/
package org.apache.drill.exec.fn.impl;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.drill.BaseTestQuery;
import org.apache.drill.common.util.FileUtils;
import org.joda.time.DateTime;
import org.junit.Test;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Map;
+
public class TestCastFunctions extends BaseTestQuery {
@Test
@@ -78,4 +84,435 @@ public class TestCastFunctions extends BaseTestQuery {
.build()
.run();
}
+
+ @Test
+ public void testCastFloatToInt() throws Exception {
+ Map<Float, Integer> values = Maps.newHashMap();
+
+ values.put(0F, 0);
+ values.put(0.4F, 0);
+ values.put(-0.4F, 0);
+ values.put(0.5F, 1);
+ values.put(-0.5F, -1);
+ values.put(16777215F, 16777215);
+ values.put(1677721F + 0.4F, 1677721);
+ values.put(1677721F + 0.5F, 1677722);
+ values.put(-16777216F, -16777216);
+ values.put(-1677721 - 0.4F, -1677721);
+ values.put(-1677721 - 0.5F, -1677722);
+ values.put(Float.MAX_VALUE, Integer.MAX_VALUE);
+ values.put(-Float.MAX_VALUE, Integer.MIN_VALUE);
+ values.put(Float.MIN_VALUE, 0);
+
+ for (float value : values.keySet()) {
+ try {
+ test("create table dfs_test.tmp.table_with_float as\n" +
+ "(select cast(%1$s as float) c1 from (values(1)))", value);
+
+ testBuilder()
+ .sqlQuery("select cast(c1 as int) col1 from dfs_test.tmp.table_with_float")
+ .unOrdered()
+ .baselineColumns("col1")
+ .baselineValues(values.get(value))
+ .build()
+ .run();
+ } finally {
+ test("drop table if exists dfs_test.tmp.table_with_float");
+ }
+ }
+ }
+
+ @Test
+ public void testCastIntToFloatAndDouble() throws Exception {
+ List<Integer> values = Lists.newArrayList();
+
+ values.add(0);
+ values.add(1);
+ values.add(-1);
+ values.add(16777215);
+ values.add(-16777216);
+ values.add(Integer.MAX_VALUE);
+ values.add(Integer.MIN_VALUE);
+
+ for (int value : values) {
+ try {
+ test("create table dfs_test.tmp.table_with_int as\n" +
+ "(select cast(%1$s as int) c1 from (values(1)))", value);
+
+ testBuilder()
+ .sqlQuery("select cast(c1 as float) col1,\n" +
+ "cast(c1 as double) col2\n" +
+ "from dfs_test.tmp.table_with_int")
+ .unOrdered()
+ .baselineColumns("col1", "col2")
+ .baselineValues((float) value, (double) value)
+ .build()
+ .run();
+ } finally {
+ test("drop table if exists dfs_test.tmp.table_with_int");
+ }
+ }
+ }
+
+ @Test
+ public void testCastFloatToBigInt() throws Exception {
+ Map<Float, Long> values = Maps.newHashMap();
+
+ values.put(0F, 0L);
+ values.put(0.4F, 0L);
+ values.put(-0.4F, 0L);
+ values.put(0.5F, 1L);
+ values.put(-0.5F, -1L);
+ values.put(16777215F, 16777215L);
+ values.put(1677721F + 0.4F, 1677721L);
+ values.put(1677721F + 0.5F, 1677722L);
+ values.put(-16777216F, -16777216L);
+ values.put(-1677721 - 0.4F, -1677721L);
+ values.put(-1677721 - 0.5F, -1677722L);
+ values.put(Float.MAX_VALUE, Long.MAX_VALUE);
+ values.put(Long.MIN_VALUE * 2F, Long.MIN_VALUE);
+ values.put(Float.MIN_VALUE, 0L);
+
+ for (float value : values.keySet()) {
+ try {
+ test("create table dfs_test.tmp.table_with_float as\n" +
+ "(select cast(%1$s as float) c1 from (values(1)))", value);
+
+ testBuilder()
+ .sqlQuery("select cast(c1 as bigInt) col1 from dfs_test.tmp.table_with_float")
+ .unOrdered()
+ .baselineColumns("col1")
+ .baselineValues(values.get(value))
+ .build()
+ .run();
+ } finally {
+ test("drop table if exists dfs_test.tmp.table_with_float");
+ }
+ }
+ }
+
+ @Test
+ public void testCastBigIntToFloatAndDouble() throws Exception {
+ List<Long> values = Lists.newArrayList();
+
+ values.add(0L);
+ values.add(1L);
+ values.add(-1L);
+ values.add(16777215L);
+ values.add(-16777216L);
+ values.add(9007199254740991L);
+ values.add(-9007199254740992L);
+ values.add(Long.MAX_VALUE);
+ values.add(Long.MIN_VALUE);
+
+ for (long value : values) {
+ try {
+ test("create table dfs_test.tmp.table_with_bigint as\n" +
+ "(select cast(%1$s as bigInt) c1 from (values(1)))", value);
+
+ testBuilder()
+ .sqlQuery("select cast(c1 as float) col1,\n" +
+ "cast(c1 as double) col2\n" +
+ "from dfs_test.tmp.table_with_bigint")
+ .unOrdered()
+ .baselineColumns("col1", "col2")
+ .baselineValues((float) value, (double) value)
+ .build()
+ .run();
+ } finally {
+ test("drop table if exists dfs_test.tmp.table_with_bigint");
+ }
+ }
+ }
+
+ @Test
+ public void testCastDoubleToInt() throws Exception {
+ Map<Double, Integer> values = Maps.newHashMap();
+
+ values.put(0D, 0);
+ values.put(0.4, 0);
+ values.put(-0.4, 0);
+ values.put(0.5, 1);
+ values.put(-0.5, -1);
+ values.put((double) Integer.MAX_VALUE, Integer.MAX_VALUE);
+ values.put(Integer.MAX_VALUE + 0.4, Integer.MAX_VALUE);
+ values.put(Integer.MAX_VALUE + 0.5, Integer.MAX_VALUE);
+ values.put((double) Integer.MIN_VALUE, Integer.MIN_VALUE);
+ values.put(Integer.MIN_VALUE - 0.4, Integer.MIN_VALUE);
+ values.put(Integer.MIN_VALUE - 0.5, Integer.MIN_VALUE);
+ values.put(Double.MAX_VALUE, Integer.MAX_VALUE);
+ values.put(-Double.MAX_VALUE, Integer.MIN_VALUE);
+ values.put(Double.MIN_VALUE, 0);
+
+ for (double value : values.keySet()) {
+ try {
+ test("create table dfs_test.tmp.table_with_double as\n" +
+ "(select cast(%1$s as double) c1 from (values(1)))", value);
+
+ testBuilder()
+ .sqlQuery("select cast(c1 as int) col1 from dfs_test.tmp.table_with_double")
+ .unOrdered()
+ .baselineColumns("col1")
+ .baselineValues(values.get(value))
+ .build()
+ .run();
+ } finally {
+ test("drop table if exists dfs_test.tmp.table_with_double");
+ }
+ }
+ }
+
+ @Test
+ public void testCastDoubleToBigInt() throws Exception {
+ Map<Double, Long> values = Maps.newHashMap();
+
+ values.put(0D, 0L);
+ values.put(0.4, 0L);
+ values.put(-0.4, 0L);
+ values.put(0.5, 1L);
+ values.put(-0.5, -1L);
+ values.put((double) Integer.MAX_VALUE, (long) Integer.MAX_VALUE);
+ values.put((double) 9007199254740991L, 9007199254740991L);
+ values.put(900719925474098L + 0.4, 900719925474098L);
+ values.put(900719925474098L + 0.5, 900719925474099L);
+ values.put((double) -9007199254740991L, -9007199254740991L);
+ values.put(-900719925474098L - 0.4, -900719925474098L);
+ values.put(-900719925474098L - 0.5, -900719925474099L);
+ values.put(Double.MAX_VALUE, Long.MAX_VALUE);
+ values.put(-Double.MAX_VALUE, Long.MIN_VALUE);
+ values.put(Double.MIN_VALUE, 0L);
+ for (double value : values.keySet()) {
+ try {
+ test("create table dfs_test.tmp.table_with_double as\n" +
+ "(select cast(%1$s as double) c1 from (values(1)))", value);
+
+ testBuilder()
+ .sqlQuery("select cast(c1 as bigInt) col1 from dfs_test.tmp.table_with_double")
+ .unOrdered()
+ .baselineColumns("col1")
+ .baselineValues(values.get(value))
+ .build()
+ .run();
+ } finally {
+ test("drop table if exists dfs_test.tmp.table_with_double");
+ }
+ }
+ }
+
+ @Test
+ public void testCastIntAndBigInt() throws Exception {
+ List<Integer> values = Lists.newArrayList();
+
+ values.add(0);
+ values.add(1);
+ values.add(-1);
+ values.add(Integer.MAX_VALUE);
+ values.add(Integer.MIN_VALUE);
+ values.add(16777215);
+
+ for (int value : values) {
+ try {
+ test("create table dfs_test.tmp.table_with_int as\n" +
+ "(select cast(%1$s as int) c1, cast(%1$s as bigInt) c2 from (values(1)))", value);
+
+ testBuilder()
+ .sqlQuery("select cast(c1 as bigint) col1,\n" +
+ "cast(c1 as int) col2\n" +
+ "from dfs_test.tmp.table_with_int")
+ .unOrdered()
+ .baselineColumns("col1", "col2")
+ .baselineValues((long) value, value)
+ .build()
+ .run();
+ } finally {
+ test("drop table if exists dfs_test.tmp.table_with_int");
+ }
+ }
+ }
+
+ @Test
+ public void testCastFloatAndDouble() throws Exception {
+ List<Double> values = Lists.newArrayList();
+
+ values.add(0d);
+ values.add(0.4);
+ values.add(-0.4);
+ values.add(0.5);
+ values.add(-0.5);
+ values.add(16777215d);
+ values.add(-16777216d);
+ values.add((double) Float.MAX_VALUE);
+ values.add(Double.MAX_VALUE);
+ values.add((double) Float.MIN_VALUE);
+ values.add(Double.MIN_VALUE);
+
+ for (double value : values) {
+ try {
+ test("create table dfs_test.tmp.table_with_float as\n" +
+ "(select cast(%1$s as float) c1,\n" +
+ "cast(%1$s as double) c2\n" +
+ "from (values(1)))", value);
+
+ testBuilder()
+ .sqlQuery("select cast(c1 as double) col1,\n" +
+ "cast(c2 as float) col2\n" +
+ "from dfs_test.tmp.table_with_float")
+ .unOrdered()
+ .baselineColumns("col1", "col2")
+ .baselineValues((double) ((float) (value)), (float) value)
+ .build()
+ .run();
+ } finally {
+ test("drop table if exists dfs_test.tmp.table_with_float");
+ }
+ }
+ }
+
+ @Test
+ public void testCastIntAndBigIntToDecimal() throws Exception {
+ try {
+ test("alter session set planner.enable_decimal_data_type = true");
+
+ testBuilder()
+ .physicalPlanFromFile("decimal/cast_int_decimal.json")
+ .unOrdered()
+ .baselineColumns("DEC9_INT", "DEC38_INT", "DEC9_BIGINT", "DEC38_BIGINT")
+ .baselineValues(new BigDecimal(0), new BigDecimal(0), new BigDecimal(0), new BigDecimal(0))
+ .baselineValues(new BigDecimal(1), new BigDecimal(1), new BigDecimal(1), new BigDecimal(1))
+ .baselineValues(new BigDecimal(-1), new BigDecimal(-1), new BigDecimal(-1), new BigDecimal(-1))
+
+ .baselineValues(new BigDecimal(Integer.MAX_VALUE),
+ new BigDecimal(Integer.MAX_VALUE),
+ new BigDecimal((int) Long.MAX_VALUE),
+ new BigDecimal(Long.MAX_VALUE))
+
+ .baselineValues(new BigDecimal(Integer.MIN_VALUE),
+ new BigDecimal(Integer.MIN_VALUE),
+ new BigDecimal((int) Long.MIN_VALUE),
+ new BigDecimal(Long.MIN_VALUE))
+
+ .baselineValues(new BigDecimal(123456789),
+ new BigDecimal(123456789),
+ new BigDecimal(123456789),
+ new BigDecimal(123456789))
+ .build()
+ .run();
+ } finally {
+ test("drop table if exists dfs_test.tmp.table_with_int");
+ test("alter session reset planner.enable_decimal_data_type");
+ }
+ }
+
+ @Test
+ public void testCastDecimalToIntAndBigInt() throws Exception {
+ try {
+ test("alter session set planner.enable_decimal_data_type = true");
+
+ testBuilder()
+ .physicalPlanFromFile("decimal/cast_decimal_int.json")
+ .unOrdered()
+ .baselineColumns("DEC9_INT", "DEC38_INT", "DEC9_BIGINT", "DEC38_BIGINT")
+ .baselineValues(0, 0, 0L, 0L)
+ .baselineValues(1, 1, 1L, 1L)
+ .baselineValues(-1, -1, -1L, -1L)
+ .baselineValues(Integer.MAX_VALUE, (int) Long.MAX_VALUE, (long) Integer.MAX_VALUE, Long.MAX_VALUE)
+ .baselineValues(Integer.MIN_VALUE, (int) Long.MIN_VALUE, (long) Integer.MIN_VALUE, Long.MIN_VALUE)
+ .baselineValues(123456789, 123456789, 123456789L, 123456789L)
+ .build()
+ .run();
+ } finally {
+ test("drop table if exists dfs_test.tmp.table_with_int");
+ test("alter session reset planner.enable_decimal_data_type");
+ }
+ }
+
+ @Test
+ public void testCastDecimalToFloatAndDouble() throws Exception {
+ try {
+ test("alter session set planner.enable_decimal_data_type = true");
+
+ testBuilder()
+ .physicalPlanFromFile("decimal/cast_decimal_float.json")
+ .ordered()
+ .baselineColumns("DEC9_FLOAT", "DEC38_FLOAT", "DEC9_DOUBLE", "DEC38_DOUBLE")
+ .baselineValues(99f, 123456789f, 99d, 123456789d)
+ .baselineValues(11.1235f, 11.1235f, 11.1235, 11.1235)
+ .baselineValues(0.1000f, 0.1000f, 0.1000, 0.1000)
+ .baselineValues(-0.12f, -0.1004f, -0.12, -0.1004)
+ .baselineValues(-123.1234f, -987654321.1234567891f, -123.1234, -987654321.1235)
+ .baselineValues(-1.0001f, -2.0301f, -1.0001, -2.0301)
+ .build()
+ .run();
+ } finally {
+ test("drop table if exists dfs_test.tmp.table_with_int");
+ test("alter session reset planner.enable_decimal_data_type");
+ }
+ }
+
+ @Test // DRILL-4970
+ public void testCastNegativeFloatToInt() throws Exception {
+ try {
+ test("create table dfs_test.tmp.table_with_float as\n" +
+ "(select cast(-255.0 as double) as double_col,\n" +
+ "cast(-255.0 as float) as float_col\n" +
+ "from (values(1)))");
+
+ final List<String> columnNames = Lists.newArrayList();
+ columnNames.add("float_col");
+ columnNames.add("double_col");
+
+ final List<String> castTypes = Lists.newArrayList();
+ castTypes.add("int");
+ castTypes.add("bigInt");
+
+ final String query = "select count(*) as c from dfs_test.tmp.table_with_float\n" +
+ "where (cast(%1$s as %2$s) >= -255 and (%1$s <= -5)) or (%1$s <= -256)";
+
+ for (String columnName : columnNames) {
+ for (String castType : castTypes) {
+ testBuilder()
+ .sqlQuery(query, columnName, castType)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(1L)
+ .build()
+ .run();
+ }
+ }
+ } finally {
+ test("drop table if exists dfs_test.tmp.table_with_float");
+ }
+ }
+
+ @Test // DRILL-4970
+ public void testCastNegativeDecimalToVarChar() throws Exception {
+ try {
+ test("alter session set planner.enable_decimal_data_type = true");
+
+ test("create table dfs_test.tmp.table_with_decimal as" +
+ "(select cast(cast(manager_id as double) * (-1) as decimal(9, 0)) as decimal9_col,\n" +
+ "cast(cast(manager_id as double) * (-1) as decimal(18, 0)) as decimal18_col\n" +
+ "from cp.`parquet/fixedlenDecimal.parquet` limit 1)");
+
+ final List<String> columnNames = Lists.newArrayList();
+ columnNames.add("decimal9_col");
+ columnNames.add("decimal18_col");
+
+ final String query = "select count(*) as c from dfs_test.tmp.table_with_decimal\n" +
+ "where (cast(%1$s as varchar) = '-124' and (%1$s <= -5)) or (%1$s <= -256)";
+
+ for (String colName : columnNames) {
+ testBuilder()
+ .sqlQuery(query, colName)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(1L)
+ .build()
+ .run();
+ }
+ } finally {
+ test("drop table if exists dfs_test.tmp.table_with_decimal");
+ test("alter session reset planner.enable_decimal_data_type");
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/35a1ec66/exec/java-exec/src/test/resources/decimal/cast_decimal_float.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/decimal/cast_decimal_float.json b/exec/java-exec/src/test/resources/decimal/cast_decimal_float.json
new file mode 100644
index 0000000..3fc94a0
--- /dev/null
+++ b/exec/java-exec/src/test/resources/decimal/cast_decimal_float.json
@@ -0,0 +1,45 @@
+{
+ "head" : {
+ "version" : 1,
+ "generator" : {
+ "type" : "org.apache.drill.exec.planner.logical.DrillImplementor",
+ "info" : ""
+ },
+ "type" : "APACHE_DRILL_PHYSICAL",
+ "resultMode" : "EXEC"
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"fs-scan",
+ format: {type: "json"},
+ storage:{type: "file", connection: "classpath:///"},
+ files:["/input_simple_decimal.json"]
+ }, {
+ "pop" : "project",
+ "@id" : 2,
+ "exprs" : [
+ { "ref" : "DEC9_COL", "expr": "(cast(DEC9 as decimal9(9, 4)))" },
+ { "ref" : "DEC38_COL", "expr": "(cast(DEC18 as decimal38sparse(38, 4)))" }
+ ],
+
+ "child" : 1
+ },
+ {
+ "pop" : "project",
+ "@id" : 4,
+ "exprs" : [
+ {"ref": "DEC9_FLOAT", "expr" : "cast(DEC9_COL as float4)"},
+ {"ref": "DEC38_FLOAT", "expr" : "cast(DEC38_COL as float4)"},
+ {"ref": "DEC9_DOUBLE", "expr" : "cast(DEC9_COL as float8)"},
+ {"ref": "DEC38_DOUBLE", "expr" : "cast(DEC38_COL as float8)"}
+ ],
+
+ "child" : 2
+ },
+ {
+ "pop" : "screen",
+ "@id" : 5,
+ "child" : 4
+ } ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/35a1ec66/exec/java-exec/src/test/resources/decimal/cast_decimal_int.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/decimal/cast_decimal_int.json b/exec/java-exec/src/test/resources/decimal/cast_decimal_int.json
new file mode 100644
index 0000000..2cca865
--- /dev/null
+++ b/exec/java-exec/src/test/resources/decimal/cast_decimal_int.json
@@ -0,0 +1,45 @@
+{
+ "head" : {
+ "version" : 1,
+ "generator" : {
+ "type" : "org.apache.drill.exec.planner.logical.DrillImplementor",
+ "info" : ""
+ },
+ "type" : "APACHE_DRILL_PHYSICAL",
+ "resultMode" : "EXEC"
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"fs-scan",
+ format: {type: "json"},
+ storage:{type: "file", connection: "classpath:///"},
+ files:["/input_simple_decimal.json"]
+ }, {
+ "pop" : "project",
+ "@id" : 2,
+ "exprs" : [
+ { "ref" : "DEC9_COL", "expr": "(cast(cast(INT_COL as int) as decimal9(9, 0)))" },
+ { "ref" : "DEC38_COL", "expr": "(cast(BIGINT_COL as decimal38sparse(38, 0)))" }
+ ],
+
+ "child" : 1
+ },
+ {
+ "pop" : "project",
+ "@id" : 4,
+ "exprs" : [
+ {"ref": "DEC9_INT", "expr" : "cast(DEC9_COL as int)"},
+ {"ref": "DEC38_INT", "expr" : "cast(DEC38_COL as int)"},
+ {"ref": "DEC9_BIGINT", "expr" : "cast(DEC9_COL as bigint)"},
+ {"ref": "DEC38_BIGINT", "expr" : "cast(DEC38_COL as bigint)"}
+ ],
+
+ "child" : 2
+ },
+ {
+ "pop" : "screen",
+ "@id" : 5,
+ "child" : 4
+ } ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/35a1ec66/exec/java-exec/src/test/resources/decimal/cast_int_decimal.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/decimal/cast_int_decimal.json b/exec/java-exec/src/test/resources/decimal/cast_int_decimal.json
new file mode 100644
index 0000000..885ff53
--- /dev/null
+++ b/exec/java-exec/src/test/resources/decimal/cast_int_decimal.json
@@ -0,0 +1,45 @@
+{
+ "head" : {
+ "version" : 1,
+ "generator" : {
+ "type" : "org.apache.drill.exec.planner.logical.DrillImplementor",
+ "info" : ""
+ },
+ "type" : "APACHE_DRILL_PHYSICAL",
+ "resultMode" : "EXEC"
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"fs-scan",
+ format: {type: "json"},
+ storage:{type: "file", connection: "classpath:///"},
+ files:["/input_simple_decimal.json"]
+ }, {
+ "pop" : "project",
+ "@id" : 2,
+ "exprs" : [
+ { "ref" : "INT_COL", "expr": "(cast(INT_COL as int))" },
+ { "ref" : "BIGINT_COL", "expr": "(cast(BIGINT_COL as bigint))" }
+ ],
+
+ "child" : 1
+ },
+ {
+ "pop" : "project",
+ "@id" : 4,
+ "exprs" : [
+ {"ref": "DEC9_INT", "expr" : "cast(INT_COL as decimal9(9, 0))"},
+ {"ref": "DEC38_INT", "expr" : "cast(INT_COL as decimal38sparse(38, 0))"},
+ {"ref": "DEC9_BIGINT", "expr" : "cast(BIGINT_COL as decimal9(9, 0))"},
+ {"ref": "DEC38_BIGINT", "expr" : "cast(BIGINT_COL as decimal38sparse(38, 0))"}
+ ],
+
+ "child" : 2
+ },
+ {
+ "pop" : "screen",
+ "@id" : 5,
+ "child" : 4
+ } ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/35a1ec66/exec/java-exec/src/test/resources/input_simple_decimal.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/input_simple_decimal.json b/exec/java-exec/src/test/resources/input_simple_decimal.json
index b7c9481..d3687db 100644
--- a/exec/java-exec/src/test/resources/input_simple_decimal.json
+++ b/exec/java-exec/src/test/resources/input_simple_decimal.json
@@ -1,24 +1,36 @@
{
"DEC9": "99",
-"DEC18": "123456789"
+"DEC18": "123456789",
+"INT_COL": "0",
+"BIGINT_COL": "0"
}
{
"DEC9": "11.1234567890123456",
-"DEC18": "11.123456789"
+"DEC18": "11.123456789",
+"INT_COL": "1",
+"BIGINT_COL": "1"
}
{
"DEC9": "0.100000000001",
-"DEC18":"0.100000000001"
+"DEC18":"0.100000000001",
+"INT_COL": "-1",
+"BIGINT_COL": "-1"
}
{
"DEC9": "-0.12",
-"DEC18":"-0.1004"
+"DEC18":"-0.1004",
+"INT_COL": "2147483647",
+"BIGINT_COL": "9223372036854775807"
}
{
"DEC9": "-123.1234",
-"DEC18": "-987654321.1234567891"
+"DEC18": "-987654321.1234567891",
+"INT_COL": "-2147483648",
+"BIGINT_COL": "-9223372036854775808"
}
{
"DEC9": "-1.0001",
-"DEC18":"-2.0301"
+"DEC18":"-2.0301",
+"INT_COL": "123456789",
+"BIGINT_COL": "123456789"
}
[2/3] drill git commit: DRILL-5420: ParquetAsyncPgReader goes into
infinite loop during cleanup
Posted by pr...@apache.org.
DRILL-5420: ParquetAsyncPgReader goes into infinite loop during cleanup
PageQueue is cleaned up using poll() instead of take(), which constantly gets interrupted and causes CPU churn.
During a columnReader shutdown, a flag is set so as to block any new page reading tasks from being submitted.
closes #862
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0e1e6042
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0e1e6042
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0e1e6042
Branch: refs/heads/master
Commit: 0e1e6042f507ac0da2a21d7de3bbed4c0dac549a
Parents: 0b830ef
Author: Kunal Khatua <kk...@maprtech.com>
Authored: Wed Jun 28 17:39:08 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Mon Jul 3 18:02:11 2017 -0700
----------------------------------------------------------------------
.../parquet/columnreaders/AsyncPageReader.java | 21 +++++++++++++-------
.../parquet/columnreaders/BatchReader.java | 4 +++-
.../parquet/columnreaders/ColumnReader.java | 11 +++++++---
.../columnreaders/VarLenBinaryReader.java | 4 +++-
4 files changed, 28 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/0e1e6042/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index 926436c..8c89e3a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -130,7 +130,10 @@ class AsyncPageReader extends PageReader {
@Override protected void init() throws IOException {
super.init();
- asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+ //Avoid Init if a shutdown is already in progress even if init() is called once
+ if (!parentColumnReader.isShuttingDown) {
+ asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+ }
}
private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
@@ -224,7 +227,7 @@ class AsyncPageReader extends PageReader {
}
//if the queue was full before we took a page out, then there would
// have been no new read tasks scheduled. In that case, schedule a new read.
- if (pageQueueFull) {
+ if (!parentColumnReader.isShuttingDown && pageQueueFull) {
asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
}
}
@@ -256,7 +259,7 @@ class AsyncPageReader extends PageReader {
}
//if the queue was full before we took a page out, then there would
// have been no new read tasks scheduled. In that case, schedule a new read.
- if (pageQueueFull) {
+ if (!parentColumnReader.isShuttingDown && pageQueueFull) {
asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
}
}
@@ -278,6 +281,7 @@ class AsyncPageReader extends PageReader {
}
@Override public void clear() {
+ //Cancelling all existing AsyncPageReaderTasks
while (asyncPageRead != null && !asyncPageRead.isEmpty()) {
try {
Future<Void> f = asyncPageRead.poll();
@@ -298,12 +302,13 @@ class AsyncPageReader extends PageReader {
while (!pageQueue.isEmpty()) {
r = null;
try {
- r = pageQueue.take();
+ r = pageQueue.poll();
if (r == ReadStatus.EMPTY) {
break;
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ //Reporting because we shouldn't get this
+ logger.error(e.getMessage());
} finally {
if (r != null && r.pageData != null) {
r.pageData.release();
@@ -412,6 +417,7 @@ class AsyncPageReader extends PageReader {
try {
PageHeader pageHeader = Util.readPageHeader(parent.dataReader);
int compressedSize = pageHeader.getCompressed_page_size();
+ if ( parent.parentColumnReader.isShuttingDown ) { return null; } //Opportunity to skip expensive Parquet processing
pageData = parent.dataReader.getNext(compressedSize);
bytesRead = compressedSize;
@@ -438,7 +444,7 @@ class AsyncPageReader extends PageReader {
queue.put(readStatus);
// if the queue is not full, schedule another read task immediately. If it is then the consumer
// will schedule a new read task as soon as it removes a page from the queue.
- if (queue.remainingCapacity() > 0) {
+ if (!parentColumnReader.isShuttingDown && queue.remainingCapacity() > 0) {
asyncPageRead.offer(parent.threadPool.submit(new AsyncPageReaderTask(debugName, queue)));
}
}
@@ -454,6 +460,7 @@ class AsyncPageReader extends PageReader {
}
parent.handleAndThrowException(e, "Exception occurred while reading from disk.");
} finally {
+ //Nothing to do if isShuttingDown.
}
return null;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0e1e6042/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
index 651c813..367b226 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
@@ -68,7 +68,9 @@ public abstract class BatchReader {
ArrayList<Future<Long>> futures = Lists.newArrayList();
for (ColumnReader<?> crs : readState.getColumnReaders()) {
Future<Long> f = crs.processPagesAsync(recordsToRead);
- futures.add(f);
+ if (f != null) {
+ futures.add(f);
+ }
}
Exception exception = null;
for(Future<Long> f: futures){
http://git-wip-us.apache.org/repos/asf/drill/blob/0e1e6042/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index 2d8f556..8b6e7a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -81,6 +81,8 @@ public abstract class ColumnReader<V extends ValueVector> {
long readStartInBytes = 0, readLength = 0, readLengthInBits = 0, recordsReadInThisIteration = 0;
private ExecutorService threadPool;
+ volatile boolean isShuttingDown; //Indicate to not submit any new AsyncPageReader Tasks during clear()
+
protected ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
this.parentReader = parentReader;
@@ -125,7 +127,7 @@ public abstract class ColumnReader<V extends ValueVector> {
}
public Future<Long> processPagesAsync(long recordsToReadInThisPass){
- Future<Long> r = threadPool.submit(new ColumnReaderProcessPagesTask(recordsToReadInThisPass));
+ Future<Long> r = (isShuttingDown ? null : threadPool.submit(new ColumnReaderProcessPagesTask(recordsToReadInThisPass)));
return r;
}
@@ -143,6 +145,9 @@ public abstract class ColumnReader<V extends ValueVector> {
}
public void clear() {
+ //State to indicate no more tasks to be scheduled
+ isShuttingDown = true;
+
valueVec.clear();
pageReader.clear();
}
@@ -190,8 +195,8 @@ public abstract class ColumnReader<V extends ValueVector> {
return checkVectorCapacityReached();
}
- protected Future<Integer> readRecordsAsync(int recordsToRead){
- Future<Integer> r = threadPool.submit(new ColumnReaderReadRecordsTask(recordsToRead));
+ protected Future<Integer> readRecordsAsync(int recordsToRead) {
+ Future<Integer> r = (isShuttingDown ? null : threadPool.submit(new ColumnReaderReadRecordsTask(recordsToRead)));
return r;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0e1e6042/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
index b598ac8..900348f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
@@ -119,7 +119,9 @@ public class VarLenBinaryReader {
ArrayList<Future<Integer>> futures = Lists.newArrayList();
for (VarLengthColumn<?> columnReader : columns) {
Future<Integer> f = columnReader.readRecordsAsync(columnReader.pageReader.valuesReadyToRead);
- futures.add(f);
+ if (f != null) {
+ futures.add(f);
+ }
}
Exception exception = null;
for(Future<Integer> f: futures){