You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/07/04 03:00:10 UTC

[1/3] drill git commit: DRILL-4722: Fix EqualityVisitor for interval day expressions with millis

Repository: drill
Updated Branches:
  refs/heads/master f5b975adf -> 35a1ec667


DRILL-4722: Fix EqualityVisitor for interval day expressions with millis

closes #861


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0b830ef5
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0b830ef5
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0b830ef5

Branch: refs/heads/master
Commit: 0b830ef5370ad865c13da64121cd8a0e366dfd7b
Parents: f5b975a
Author: Volodymyr Vysotskyi <vv...@gmail.com>
Authored: Mon Jun 26 18:12:34 2017 +0000
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Mon Jul 3 18:00:20 2017 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/expr/EqualityVisitor.java |  6 +-
 .../exec/fn/impl/TestDateAddFunctions.java      | 79 ++++++++++++++++++++
 2 files changed, 82 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/0b830ef5/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EqualityVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EqualityVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EqualityVisitor.java
index 5f79f32..44ff78c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EqualityVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EqualityVisitor.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -44,7 +44,6 @@ import org.apache.drill.common.expression.ValueExpressions.QuotedString;
 import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
 import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
 import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
-import org.apache.drill.exec.compile.sig.GeneratorMapping;
 
 import java.util.List;
 
@@ -169,7 +168,8 @@ class EqualityVisitor extends AbstractExprVisitor<Boolean,LogicalExpression,Runt
     if (!(value instanceof IntervalDayExpression)) {
       return false;
     }
-    return intExpr.getIntervalDay() == ((IntervalDayExpression) value).getIntervalDay();
+    return intExpr.getIntervalDay() == ((IntervalDayExpression) value).getIntervalDay()
+            && intExpr.getIntervalMillis() == ((IntervalDayExpression) value).getIntervalMillis();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/0b830ef5/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestDateAddFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestDateAddFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestDateAddFunctions.java
new file mode 100644
index 0000000..7a5a49a
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestDateAddFunctions.java
@@ -0,0 +1,79 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to you under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.drill.exec.fn.impl;
+
+import org.apache.drill.BaseTestQuery;
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+public class TestDateAddFunctions extends BaseTestQuery {
+
+  @Test
+  public void testDateAddIntervalDay() throws Exception {
+    String query = "select date_add(timestamp '2015-01-24 07:27:05.0', interval '3' day) as col1,\n" +
+                          "date_add(timestamp '2015-01-24 07:27:05.0', interval '5' day) as col2,\n" +
+                          "date_add(timestamp '2015-01-24 07:27:05.0', interval '5' hour) as col3,\n" +
+                          "date_add(timestamp '2015-01-24 07:27:05.0', interval '5' minute) as col4,\n" +
+                          "date_add(timestamp '2015-01-24 07:27:05.0', interval '5' second) as col5,\n" +
+                          "date_add(timestamp '2015-01-24 07:27:05.0', interval '5 10:20:30' day to second) as col6\n" +
+                   "from (values(1))";
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("col1", "col2", "col3", "col4", "col5", "col6")
+      .baselineValues(DateTime.parse("2015-01-27T07:27:05.0"),
+                      DateTime.parse("2015-01-29T07:27:05.0"),
+                      DateTime.parse("2015-01-24T12:27:05.0"),
+                      DateTime.parse("2015-01-24T07:32:05.0"),
+                      DateTime.parse("2015-01-24T07:27:10.0"),
+                      DateTime.parse("2015-01-29T17:47:35.0"))
+      .go();
+  }
+
+  @Test
+  public void testDateAddIntervalYear() throws Exception {
+    String query = "select date_add(date '2015-01-24', interval '3' month) as col1,\n" +
+                          "date_add(date '2015-01-24', interval '5' month) as col2,\n" +
+                          "date_add(date '2015-01-24', interval '5' year) as col3\n" +
+                   "from (values(1))";
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("col1", "col2", "col3")
+      .baselineValues(DateTime.parse("2015-04-24"),
+                      DateTime.parse("2015-06-24"),
+                      DateTime.parse("2020-01-24"))
+      .go();
+  }
+
+  @Test
+  public void testDateAddIntegerAsDay() throws Exception {
+    String query = "select date_add(date '2015-01-24', 3) as col1,\n" +
+                          "date_add(date '2015-01-24', 5) as col2\n" +
+                   "from (values(1))";
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("col1", "col2")
+      .baselineValues(DateTime.parse("2015-01-27"),
+                      DateTime.parse("2015-01-29"))
+      .go();
+  }
+}


[3/3] drill git commit: DRILL-4970: Prevent changing the negative value of input holder for cast functions

Posted by pr...@apache.org.
DRILL-4970: Prevent changing the negative value of input holder for cast functions

closes #863


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/35a1ec66
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/35a1ec66
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/35a1ec66

Branch: refs/heads/master
Commit: 35a1ec66726fc5a77ea61cac3b6f3bbc33b8564a
Parents: 0e1e604
Author: Volodymyr Vysotskyi <vv...@gmail.com>
Authored: Thu Jun 29 15:28:05 2017 +0000
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Mon Jul 3 18:03:35 2017 -0700

----------------------------------------------------------------------
 .../main/codegen/templates/CastFunctions.java   |  19 +-
 .../templates/Decimal/CastDecimalInt.java       |  14 +-
 .../templates/Decimal/CastDecimalVarchar.java   |  11 +-
 .../drill/exec/fn/impl/TestCastFunctions.java   | 439 ++++++++++++++++++-
 .../resources/decimal/cast_decimal_float.json   |  45 ++
 .../resources/decimal/cast_decimal_int.json     |  45 ++
 .../resources/decimal/cast_int_decimal.json     |  45 ++
 .../test/resources/input_simple_decimal.json    |  24 +-
 8 files changed, 612 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/35a1ec66/exec/java-exec/src/main/codegen/templates/CastFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/CastFunctions.java b/exec/java-exec/src/main/codegen/templates/CastFunctions.java
index f68da06..4a1f67f 100644
--- a/exec/java-exec/src/main/codegen/templates/CastFunctions.java
+++ b/exec/java-exec/src/main/codegen/templates/CastFunctions.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -43,8 +43,10 @@ import org.apache.drill.exec.record.RecordBatch;
  */
 
 @SuppressWarnings("unused")
-@FunctionTemplate(name = "cast${type.to?upper_case}", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.NULL_IF_NULL)
-public class Cast${type.from}${type.to} implements DrillSimpleFunc{
+@FunctionTemplate(name = "cast${type.to?upper_case}",
+                  scope = FunctionTemplate.FunctionScope.SIMPLE,
+                  nulls = NullHandling.NULL_IF_NULL)
+public class Cast${type.from}${type.to} implements DrillSimpleFunc {
 
   @Param ${type.from}Holder in;
   @Output ${type.to}Holder out;
@@ -53,18 +55,13 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc{
 
   public void eval() {
     <#if (type.from.startsWith("Float") && type.to.endsWith("Int"))>
-    boolean sign = (in.value < 0);
-    in.value = java.lang.Math.abs(in.value);
     ${type.native} fractional = in.value % 1;
     int digit = ((int) (fractional * 10));
     int carry = 0;
-    if (digit > 4) {
-      carry = 1;
-    }
-    out.value = ((${type.explicit}) in.value) + carry;
-    if (sign == true) {
-      out.value *= -1;
+    if (java.lang.Math.abs(digit) > 4) {
+      carry = (int) java.lang.Math.signum(digit);
     }
+    out.value = (${type.explicit}) (in.value + carry);
     <#elseif type.explicit??>
     out.value = (${type.explicit}) in.value;
     <#else>

http://git-wip-us.apache.org/repos/asf/drill/blob/35a1ec66/exec/java-exec/src/main/codegen/templates/Decimal/CastDecimalInt.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/Decimal/CastDecimalInt.java b/exec/java-exec/src/main/codegen/templates/Decimal/CastDecimalInt.java
index a13f0e7..37f35f3 100644
--- a/exec/java-exec/src/main/codegen/templates/Decimal/CastDecimalInt.java
+++ b/exec/java-exec/src/main/codegen/templates/Decimal/CastDecimalInt.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -46,7 +46,9 @@ import java.nio.ByteBuffer;
  * This class is generated using freemarker and the ${.template_name} template.
  */
 @SuppressWarnings("unused")
-@FunctionTemplate(name = "cast${type.to?upper_case}", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.NULL_IF_NULL)
+@FunctionTemplate(name = "cast${type.to?upper_case}",
+                  scope = FunctionTemplate.FunctionScope.SIMPLE,
+                  nulls = NullHandling.NULL_IF_NULL)
 public class Cast${type.from}${type.to} implements DrillSimpleFunc {
 
 @Param ${type.from}Holder in;
@@ -57,12 +59,10 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
 
     public void eval() {
 
-        int carry = (org.apache.drill.exec.util.DecimalUtility.getFirstFractionalDigit(in.value, in.scale) > 4) ? 1 : 0;
+        int carry = (org.apache.drill.exec.util.DecimalUtility.getFirstFractionalDigit(in.value, in.scale) > 4)
+                    ? (int) java.lang.Math.signum(in.value) : 0;
         // Assign the integer part of the decimal to the output holder
-        out.value = java.lang.Math.abs((${type.javatype}) (org.apache.drill.exec.util.DecimalUtility.adjustScaleDivide(in.value, (int) in.scale))) + carry;
-        if (in.value < 0) {
-          out.value *= -1;
-        }
+        out.value = (${type.javatype}) (org.apache.drill.exec.util.DecimalUtility.adjustScaleDivide(in.value, (int) in.scale) + carry);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/35a1ec66/exec/java-exec/src/main/codegen/templates/Decimal/CastDecimalVarchar.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/Decimal/CastDecimalVarchar.java b/exec/java-exec/src/main/codegen/templates/Decimal/CastDecimalVarchar.java
index 239ea28..e4c221e 100644
--- a/exec/java-exec/src/main/codegen/templates/Decimal/CastDecimalVarchar.java
+++ b/exec/java-exec/src/main/codegen/templates/Decimal/CastDecimalVarchar.java
@@ -70,22 +70,23 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
 
         StringBuilder str = new StringBuilder();
 
-        if (in.value < 0) {
+        ${type.javatype} value = in.value;
+        if (value < 0) {
             // Negative value, add '-' to the string
             str.append("-");
 
             // Negate the number
-            in.value *= -1;
+            value *= -1;
         }
 
         ${type.javatype} separator = (${type.javatype}) org.apache.drill.exec.util.DecimalUtility.getPowerOfTen((int) in.scale);
 
-        str.append(in.value / separator);
+        str.append(value / separator);
 
         if (in.scale > 0) {
             str.append(".");
 
-            String fractionalPart = String.valueOf(in.value % separator);
+            String fractionalPart = String.valueOf(value % separator);
 
             /* Since we are taking modulus to find fractional part,
              * we will miss printing the leading zeroes in the fractional part
@@ -101,7 +102,7 @@ public class Cast${type.from}${type.to} implements DrillSimpleFunc {
              *
              * We missed the initial zeroes in the fractional part. Below logic accounts for this case
              */
-            str.append(org.apache.drill.exec.util.DecimalUtility.toStringWithZeroes((in.value % separator), in.scale));
+            str.append(org.apache.drill.exec.util.DecimalUtility.toStringWithZeroes((value % separator), in.scale));
         }
 
         out.buffer = buffer;

http://git-wip-us.apache.org/repos/asf/drill/blob/35a1ec66/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestCastFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestCastFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestCastFunctions.java
index 0d50dd3..b3a7fe9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestCastFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestCastFunctions.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,11 +17,17 @@
  */
 package org.apache.drill.exec.fn.impl;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.common.util.FileUtils;
 import org.joda.time.DateTime;
 import org.junit.Test;
 
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Map;
+
 public class TestCastFunctions extends BaseTestQuery {
 
   @Test
@@ -78,4 +84,435 @@ public class TestCastFunctions extends BaseTestQuery {
         .build()
         .run();
   }
+
+  @Test
+  public void testCastFloatToInt() throws Exception {
+    Map<Float, Integer> values = Maps.newHashMap();
+
+    values.put(0F, 0);
+    values.put(0.4F, 0);
+    values.put(-0.4F, 0);
+    values.put(0.5F, 1);
+    values.put(-0.5F, -1);
+    values.put(16777215F, 16777215);
+    values.put(1677721F + 0.4F, 1677721);
+    values.put(1677721F + 0.5F, 1677722);
+    values.put(-16777216F, -16777216);
+    values.put(-1677721 - 0.4F, -1677721);
+    values.put(-1677721 - 0.5F, -1677722);
+    values.put(Float.MAX_VALUE, Integer.MAX_VALUE);
+    values.put(-Float.MAX_VALUE, Integer.MIN_VALUE);
+    values.put(Float.MIN_VALUE, 0);
+
+    for (float value : values.keySet()) {
+      try {
+        test("create table dfs_test.tmp.table_with_float as\n" +
+              "(select cast(%1$s as float) c1 from (values(1)))", value);
+
+        testBuilder()
+          .sqlQuery("select cast(c1 as int) col1 from dfs_test.tmp.table_with_float")
+          .unOrdered()
+          .baselineColumns("col1")
+          .baselineValues(values.get(value))
+          .build()
+          .run();
+      } finally {
+        test("drop table if exists dfs_test.tmp.table_with_float");
+      }
+    }
+  }
+
+  @Test
+  public void testCastIntToFloatAndDouble() throws Exception {
+    List<Integer> values = Lists.newArrayList();
+
+    values.add(0);
+    values.add(1);
+    values.add(-1);
+    values.add(16777215);
+    values.add(-16777216);
+    values.add(Integer.MAX_VALUE);
+    values.add(Integer.MIN_VALUE);
+
+    for (int value : values) {
+      try {
+        test("create table dfs_test.tmp.table_with_int as\n" +
+              "(select cast(%1$s as int) c1 from (values(1)))", value);
+
+        testBuilder()
+          .sqlQuery("select cast(c1 as float) col1,\n" +
+                            "cast(c1 as double) col2\n" +
+                    "from dfs_test.tmp.table_with_int")
+          .unOrdered()
+          .baselineColumns("col1", "col2")
+          .baselineValues((float) value, (double) value)
+          .build()
+          .run();
+      } finally {
+        test("drop table if exists dfs_test.tmp.table_with_int");
+      }
+    }
+  }
+
+  @Test
+  public void testCastFloatToBigInt() throws Exception {
+    Map<Float, Long> values = Maps.newHashMap();
+
+    values.put(0F, 0L);
+    values.put(0.4F, 0L);
+    values.put(-0.4F, 0L);
+    values.put(0.5F, 1L);
+    values.put(-0.5F, -1L);
+    values.put(16777215F, 16777215L);
+    values.put(1677721F + 0.4F, 1677721L);
+    values.put(1677721F + 0.5F, 1677722L);
+    values.put(-16777216F, -16777216L);
+    values.put(-1677721 - 0.4F, -1677721L);
+    values.put(-1677721 - 0.5F, -1677722L);
+    values.put(Float.MAX_VALUE, Long.MAX_VALUE);
+    values.put(Long.MIN_VALUE * 2F, Long.MIN_VALUE);
+    values.put(Float.MIN_VALUE, 0L);
+
+    for (float value : values.keySet()) {
+      try {
+        test("create table dfs_test.tmp.table_with_float as\n" +
+              "(select cast(%1$s as float) c1 from (values(1)))", value);
+
+        testBuilder()
+          .sqlQuery("select cast(c1 as bigInt) col1 from dfs_test.tmp.table_with_float")
+          .unOrdered()
+          .baselineColumns("col1")
+          .baselineValues(values.get(value))
+          .build()
+          .run();
+      } finally {
+        test("drop table if exists dfs_test.tmp.table_with_float");
+      }
+    }
+  }
+
+  @Test
+  public void testCastBigIntToFloatAndDouble() throws Exception {
+    List<Long> values = Lists.newArrayList();
+
+    values.add(0L);
+    values.add(1L);
+    values.add(-1L);
+    values.add(16777215L);
+    values.add(-16777216L);
+    values.add(9007199254740991L);
+    values.add(-9007199254740992L);
+    values.add(Long.MAX_VALUE);
+    values.add(Long.MIN_VALUE);
+
+    for (long value : values) {
+      try {
+        test("create table dfs_test.tmp.table_with_bigint as\n" +
+              "(select cast(%1$s as bigInt) c1 from (values(1)))", value);
+
+        testBuilder()
+          .sqlQuery("select cast(c1 as float) col1,\n" +
+                            "cast(c1 as double) col2\n" +
+                    "from dfs_test.tmp.table_with_bigint")
+          .unOrdered()
+          .baselineColumns("col1", "col2")
+          .baselineValues((float) value, (double) value)
+          .build()
+          .run();
+      } finally {
+        test("drop table if exists dfs_test.tmp.table_with_bigint");
+      }
+    }
+  }
+
+  @Test
+  public void testCastDoubleToInt() throws Exception {
+    Map<Double, Integer> values = Maps.newHashMap();
+
+    values.put(0D, 0);
+    values.put(0.4, 0);
+    values.put(-0.4, 0);
+    values.put(0.5, 1);
+    values.put(-0.5, -1);
+    values.put((double) Integer.MAX_VALUE, Integer.MAX_VALUE);
+    values.put(Integer.MAX_VALUE + 0.4, Integer.MAX_VALUE);
+    values.put(Integer.MAX_VALUE + 0.5, Integer.MAX_VALUE);
+    values.put((double) Integer.MIN_VALUE, Integer.MIN_VALUE);
+    values.put(Integer.MIN_VALUE - 0.4, Integer.MIN_VALUE);
+    values.put(Integer.MIN_VALUE - 0.5, Integer.MIN_VALUE);
+    values.put(Double.MAX_VALUE, Integer.MAX_VALUE);
+    values.put(-Double.MAX_VALUE, Integer.MIN_VALUE);
+    values.put(Double.MIN_VALUE, 0);
+
+    for (double value : values.keySet()) {
+      try {
+        test("create table dfs_test.tmp.table_with_double as\n" +
+              "(select cast(%1$s as double) c1 from (values(1)))", value);
+
+        testBuilder()
+          .sqlQuery("select cast(c1 as int) col1 from dfs_test.tmp.table_with_double")
+          .unOrdered()
+          .baselineColumns("col1")
+          .baselineValues(values.get(value))
+          .build()
+          .run();
+      } finally {
+        test("drop table if exists dfs_test.tmp.table_with_double");
+      }
+    }
+  }
+
+  @Test
+  public void testCastDoubleToBigInt() throws Exception {
+    Map<Double, Long> values = Maps.newHashMap();
+
+    values.put(0D, 0L);
+    values.put(0.4, 0L);
+    values.put(-0.4, 0L);
+    values.put(0.5, 1L);
+    values.put(-0.5, -1L);
+    values.put((double) Integer.MAX_VALUE, (long) Integer.MAX_VALUE);
+    values.put((double) 9007199254740991L, 9007199254740991L);
+    values.put(900719925474098L + 0.4, 900719925474098L);
+    values.put(900719925474098L + 0.5, 900719925474099L);
+    values.put((double) -9007199254740991L, -9007199254740991L);
+    values.put(-900719925474098L - 0.4, -900719925474098L);
+    values.put(-900719925474098L - 0.5, -900719925474099L);
+    values.put(Double.MAX_VALUE, Long.MAX_VALUE);
+    values.put(-Double.MAX_VALUE, Long.MIN_VALUE);
+    values.put(Double.MIN_VALUE, 0L);
+    for (double value : values.keySet()) {
+      try {
+        test("create table dfs_test.tmp.table_with_double as\n" +
+              "(select cast(%1$s as double) c1 from (values(1)))", value);
+
+        testBuilder()
+          .sqlQuery("select cast(c1 as bigInt) col1 from dfs_test.tmp.table_with_double")
+          .unOrdered()
+          .baselineColumns("col1")
+          .baselineValues(values.get(value))
+          .build()
+          .run();
+      } finally {
+        test("drop table if exists dfs_test.tmp.table_with_double");
+      }
+    }
+  }
+
+  @Test
+  public void testCastIntAndBigInt() throws Exception {
+    List<Integer> values = Lists.newArrayList();
+
+    values.add(0);
+    values.add(1);
+    values.add(-1);
+    values.add(Integer.MAX_VALUE);
+    values.add(Integer.MIN_VALUE);
+    values.add(16777215);
+
+    for (int value : values) {
+      try {
+        test("create table dfs_test.tmp.table_with_int as\n" +
+              "(select cast(%1$s as int) c1, cast(%1$s as bigInt) c2 from (values(1)))", value);
+
+        testBuilder()
+          .sqlQuery("select cast(c1 as bigint) col1,\n" +
+                            "cast(c1 as int) col2\n" +
+                    "from dfs_test.tmp.table_with_int")
+          .unOrdered()
+          .baselineColumns("col1", "col2")
+          .baselineValues((long) value, value)
+          .build()
+          .run();
+      } finally {
+        test("drop table if exists dfs_test.tmp.table_with_int");
+      }
+    }
+  }
+
+  @Test
+  public void testCastFloatAndDouble() throws Exception {
+    List<Double> values = Lists.newArrayList();
+
+    values.add(0d);
+    values.add(0.4);
+    values.add(-0.4);
+    values.add(0.5);
+    values.add(-0.5);
+    values.add(16777215d);
+    values.add(-16777216d);
+    values.add((double) Float.MAX_VALUE);
+    values.add(Double.MAX_VALUE);
+    values.add((double) Float.MIN_VALUE);
+    values.add(Double.MIN_VALUE);
+
+    for (double value : values) {
+      try {
+        test("create table dfs_test.tmp.table_with_float as\n" +
+              "(select cast(%1$s as float) c1,\n" +
+                      "cast(%1$s as double) c2\n" +
+              "from (values(1)))", value);
+
+        testBuilder()
+          .sqlQuery("select cast(c1 as double) col1,\n" +
+                            "cast(c2 as float) col2\n" +
+                    "from dfs_test.tmp.table_with_float")
+          .unOrdered()
+          .baselineColumns("col1", "col2")
+          .baselineValues((double) ((float) (value)), (float) value)
+          .build()
+          .run();
+      } finally {
+        test("drop table if exists dfs_test.tmp.table_with_float");
+      }
+    }
+  }
+
+  @Test
+  public void testCastIntAndBigIntToDecimal() throws Exception {
+      try {
+        test("alter session set planner.enable_decimal_data_type = true");
+
+        testBuilder()
+          .physicalPlanFromFile("decimal/cast_int_decimal.json")
+          .unOrdered()
+          .baselineColumns("DEC9_INT", "DEC38_INT", "DEC9_BIGINT", "DEC38_BIGINT")
+          .baselineValues(new BigDecimal(0), new BigDecimal(0), new BigDecimal(0), new BigDecimal(0))
+          .baselineValues(new BigDecimal(1), new BigDecimal(1), new BigDecimal(1), new BigDecimal(1))
+          .baselineValues(new BigDecimal(-1), new BigDecimal(-1), new BigDecimal(-1), new BigDecimal(-1))
+
+          .baselineValues(new BigDecimal(Integer.MAX_VALUE),
+                          new BigDecimal(Integer.MAX_VALUE),
+                          new BigDecimal((int) Long.MAX_VALUE),
+                          new BigDecimal(Long.MAX_VALUE))
+
+          .baselineValues(new BigDecimal(Integer.MIN_VALUE),
+                          new BigDecimal(Integer.MIN_VALUE),
+                          new BigDecimal((int) Long.MIN_VALUE),
+                          new BigDecimal(Long.MIN_VALUE))
+
+          .baselineValues(new BigDecimal(123456789),
+                          new BigDecimal(123456789),
+                          new BigDecimal(123456789),
+                          new BigDecimal(123456789))
+          .build()
+          .run();
+      } finally {
+        test("drop table if exists dfs_test.tmp.table_with_int");
+        test("alter session reset planner.enable_decimal_data_type");
+      }
+  }
+
+  @Test
+  public void testCastDecimalToIntAndBigInt() throws Exception {
+    try {
+      test("alter session set planner.enable_decimal_data_type = true");
+
+      testBuilder()
+        .physicalPlanFromFile("decimal/cast_decimal_int.json")
+        .unOrdered()
+        .baselineColumns("DEC9_INT", "DEC38_INT", "DEC9_BIGINT", "DEC38_BIGINT")
+        .baselineValues(0, 0, 0L, 0L)
+        .baselineValues(1, 1, 1L, 1L)
+        .baselineValues(-1, -1, -1L, -1L)
+        .baselineValues(Integer.MAX_VALUE, (int) Long.MAX_VALUE, (long) Integer.MAX_VALUE, Long.MAX_VALUE)
+        .baselineValues(Integer.MIN_VALUE, (int) Long.MIN_VALUE, (long) Integer.MIN_VALUE, Long.MIN_VALUE)
+        .baselineValues(123456789, 123456789, 123456789L, 123456789L)
+        .build()
+        .run();
+    } finally {
+      test("drop table if exists dfs_test.tmp.table_with_int");
+      test("alter session reset planner.enable_decimal_data_type");
+    }
+  }
+
+  @Test
+  public void testCastDecimalToFloatAndDouble() throws Exception {
+    try {
+      test("alter session set planner.enable_decimal_data_type = true");
+
+      testBuilder()
+        .physicalPlanFromFile("decimal/cast_decimal_float.json")
+        .ordered()
+        .baselineColumns("DEC9_FLOAT", "DEC38_FLOAT", "DEC9_DOUBLE", "DEC38_DOUBLE")
+        .baselineValues(99f, 123456789f, 99d, 123456789d)
+        .baselineValues(11.1235f, 11.1235f, 11.1235, 11.1235)
+        .baselineValues(0.1000f, 0.1000f, 0.1000, 0.1000)
+        .baselineValues(-0.12f, -0.1004f, -0.12, -0.1004)
+        .baselineValues(-123.1234f, -987654321.1234567891f, -123.1234, -987654321.1235)
+        .baselineValues(-1.0001f, -2.0301f, -1.0001, -2.0301)
+        .build()
+        .run();
+    } finally {
+      test("drop table if exists dfs_test.tmp.table_with_int");
+      test("alter session reset planner.enable_decimal_data_type");
+    }
+  }
+
+  @Test // DRILL-4970
+  public void testCastNegativeFloatToInt() throws Exception {
+    try {
+      test("create table dfs_test.tmp.table_with_float as\n" +
+              "(select cast(-255.0 as double) as double_col,\n" +
+                      "cast(-255.0 as float) as float_col\n" +
+              "from (values(1)))");
+
+      final List<String> columnNames = Lists.newArrayList();
+      columnNames.add("float_col");
+      columnNames.add("double_col");
+
+      final List<String> castTypes = Lists.newArrayList();
+      castTypes.add("int");
+      castTypes.add("bigInt");
+
+      final String query = "select count(*) as c from dfs_test.tmp.table_with_float\n" +
+                            "where (cast(%1$s as %2$s) >= -255 and (%1$s <= -5)) or (%1$s <= -256)";
+
+      for (String columnName : columnNames) {
+        for (String castType : castTypes) {
+          testBuilder()
+            .sqlQuery(query, columnName, castType)
+            .unOrdered()
+            .baselineColumns("c")
+            .baselineValues(1L)
+            .build()
+            .run();
+        }
+      }
+    } finally {
+      test("drop table if exists dfs_test.tmp.table_with_float");
+    }
+  }
+
+  @Test // DRILL-4970
+  public void testCastNegativeDecimalToVarChar() throws Exception {
+    try {
+      test("alter session set planner.enable_decimal_data_type = true");
+
+      test("create table dfs_test.tmp.table_with_decimal as" +
+              "(select cast(cast(manager_id as double) * (-1) as decimal(9, 0)) as decimal9_col,\n" +
+                      "cast(cast(manager_id as double) * (-1) as decimal(18, 0)) as decimal18_col\n" +
+              "from cp.`parquet/fixedlenDecimal.parquet` limit 1)");
+
+      final List<String> columnNames = Lists.newArrayList();
+      columnNames.add("decimal9_col");
+      columnNames.add("decimal18_col");
+
+      final String query = "select count(*) as c from dfs_test.tmp.table_with_decimal\n" +
+                            "where (cast(%1$s as varchar) = '-124' and (%1$s <= -5)) or (%1$s <= -256)";
+
+      for (String colName : columnNames) {
+        testBuilder()
+          .sqlQuery(query, colName)
+          .unOrdered()
+          .baselineColumns("c")
+          .baselineValues(1L)
+          .build()
+          .run();
+      }
+    } finally {
+      test("drop table if exists dfs_test.tmp.table_with_decimal");
+      test("alter session reset planner.enable_decimal_data_type");
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/35a1ec66/exec/java-exec/src/test/resources/decimal/cast_decimal_float.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/decimal/cast_decimal_float.json b/exec/java-exec/src/test/resources/decimal/cast_decimal_float.json
new file mode 100644
index 0000000..3fc94a0
--- /dev/null
+++ b/exec/java-exec/src/test/resources/decimal/cast_decimal_float.json
@@ -0,0 +1,45 @@
+{
+  "head" : {
+    "version" : 1,
+    "generator" : {
+      "type" : "org.apache.drill.exec.planner.logical.DrillImplementor",
+      "info" : ""
+    },
+    "type" : "APACHE_DRILL_PHYSICAL",
+    "resultMode" : "EXEC"
+  },
+  graph:[
+    {
+      @id:1,
+      pop:"fs-scan",
+      format: {type: "json"},
+      storage:{type: "file", connection: "classpath:///"},
+      files:["/input_simple_decimal.json"]
+    }, {
+      "pop" : "project",
+      "@id" : 2,
+      "exprs" : [
+        { "ref" : "DEC9_COL", "expr": "(cast(DEC9 as decimal9(9, 4)))" },
+        { "ref" : "DEC38_COL", "expr": "(cast(DEC18 as decimal38sparse(38, 4)))" }
+      ],
+
+      "child" : 1
+    },
+    {
+      "pop" : "project",
+      "@id" : 4,
+      "exprs" : [
+        {"ref": "DEC9_FLOAT", "expr" : "cast(DEC9_COL as float4)"},
+        {"ref": "DEC38_FLOAT", "expr" : "cast(DEC38_COL as float4)"},
+        {"ref": "DEC9_DOUBLE", "expr" : "cast(DEC9_COL as float8)"},
+        {"ref": "DEC38_DOUBLE", "expr" : "cast(DEC38_COL as float8)"}
+      ],
+
+      "child" : 2
+    },
+    {
+      "pop" : "screen",
+      "@id" : 5,
+      "child" : 4
+    } ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/35a1ec66/exec/java-exec/src/test/resources/decimal/cast_decimal_int.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/decimal/cast_decimal_int.json b/exec/java-exec/src/test/resources/decimal/cast_decimal_int.json
new file mode 100644
index 0000000..2cca865
--- /dev/null
+++ b/exec/java-exec/src/test/resources/decimal/cast_decimal_int.json
@@ -0,0 +1,45 @@
+{
+  "head" : {
+    "version" : 1,
+    "generator" : {
+      "type" : "org.apache.drill.exec.planner.logical.DrillImplementor",
+      "info" : ""
+    },
+    "type" : "APACHE_DRILL_PHYSICAL",
+    "resultMode" : "EXEC"
+  },
+  graph:[
+    {
+      @id:1,
+      pop:"fs-scan",
+      format: {type: "json"},
+      storage:{type: "file", connection: "classpath:///"},
+      files:["/input_simple_decimal.json"]
+    }, {
+      "pop" : "project",
+      "@id" : 2,
+      "exprs" : [
+        { "ref" : "DEC9_COL", "expr": "(cast(cast(INT_COL as int) as decimal9(9, 0)))" },
+        { "ref" : "DEC38_COL", "expr": "(cast(BIGINT_COL as decimal38sparse(38, 0)))" }
+      ],
+
+      "child" : 1
+    },
+    {
+      "pop" : "project",
+      "@id" : 4,
+      "exprs" : [
+        {"ref": "DEC9_INT", "expr" : "cast(DEC9_COL as int)"},
+        {"ref": "DEC38_INT", "expr" : "cast(DEC38_COL as int)"},
+        {"ref": "DEC9_BIGINT", "expr" : "cast(DEC9_COL as bigint)"},
+        {"ref": "DEC38_BIGINT", "expr" : "cast(DEC38_COL as bigint)"}
+      ],
+
+      "child" : 2
+    },
+    {
+      "pop" : "screen",
+      "@id" : 5,
+      "child" : 4
+    } ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/35a1ec66/exec/java-exec/src/test/resources/decimal/cast_int_decimal.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/decimal/cast_int_decimal.json b/exec/java-exec/src/test/resources/decimal/cast_int_decimal.json
new file mode 100644
index 0000000..885ff53
--- /dev/null
+++ b/exec/java-exec/src/test/resources/decimal/cast_int_decimal.json
@@ -0,0 +1,45 @@
+{
+  "head" : {
+    "version" : 1,
+    "generator" : {
+      "type" : "org.apache.drill.exec.planner.logical.DrillImplementor",
+      "info" : ""
+    },
+    "type" : "APACHE_DRILL_PHYSICAL",
+    "resultMode" : "EXEC"
+  },
+  graph:[
+    {
+      @id:1,
+      pop:"fs-scan",
+      format: {type: "json"},
+      storage:{type: "file", connection: "classpath:///"},
+      files:["/input_simple_decimal.json"]
+    }, {
+      "pop" : "project",
+      "@id" : 2,
+      "exprs" : [
+        { "ref" : "INT_COL", "expr": "(cast(INT_COL as int))" },
+        { "ref" : "BIGINT_COL", "expr": "(cast(BIGINT_COL as bigint))" }
+      ],
+
+      "child" : 1
+    },
+    {
+      "pop" : "project",
+      "@id" : 4,
+      "exprs" : [
+        {"ref": "DEC9_INT", "expr" : "cast(INT_COL as decimal9(9, 0))"},
+        {"ref": "DEC38_INT", "expr" : "cast(INT_COL as decimal38sparse(38, 0))"},
+        {"ref": "DEC9_BIGINT", "expr" : "cast(BIGINT_COL as decimal9(9, 0))"},
+        {"ref": "DEC38_BIGINT", "expr" : "cast(BIGINT_COL as decimal38sparse(38, 0))"}
+      ],
+
+      "child" : 2
+    },
+    {
+      "pop" : "screen",
+      "@id" : 5,
+      "child" : 4
+    } ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/35a1ec66/exec/java-exec/src/test/resources/input_simple_decimal.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/input_simple_decimal.json b/exec/java-exec/src/test/resources/input_simple_decimal.json
index b7c9481..d3687db 100644
--- a/exec/java-exec/src/test/resources/input_simple_decimal.json
+++ b/exec/java-exec/src/test/resources/input_simple_decimal.json
@@ -1,24 +1,36 @@
 {
 "DEC9": "99",
-"DEC18": "123456789"
+"DEC18": "123456789",
+"INT_COL": "0",
+"BIGINT_COL": "0"
 }
 {
 "DEC9": "11.1234567890123456",
-"DEC18": "11.123456789"
+"DEC18": "11.123456789",
+"INT_COL": "1",
+"BIGINT_COL": "1"
 }
 {
 "DEC9": "0.100000000001",
-"DEC18":"0.100000000001"
+"DEC18":"0.100000000001",
+"INT_COL": "-1",
+"BIGINT_COL": "-1"
 }
 {
 "DEC9": "-0.12",
-"DEC18":"-0.1004"
+"DEC18":"-0.1004",
+"INT_COL": "2147483647",
+"BIGINT_COL": "9223372036854775807"
 }
 {
 "DEC9": "-123.1234",
-"DEC18": "-987654321.1234567891"
+"DEC18": "-987654321.1234567891",
+"INT_COL": "-2147483648",
+"BIGINT_COL": "-9223372036854775808"
 }
 {
 "DEC9": "-1.0001",
-"DEC18":"-2.0301"
+"DEC18":"-2.0301",
+"INT_COL": "123456789",
+"BIGINT_COL": "123456789"
 }


[2/3] drill git commit: DRILL-5420: ParquetAsyncPgReader goes into infinite loop during cleanup

Posted by pr...@apache.org.
DRILL-5420: ParquetAsyncPgReader goes into infinite loop during cleanup

PageQueue is cleaned up using poll() instead of take(), which constantly gets interrupted and causes CPU churn.
During a columnReader shutdown, a flag is set so as to block any new page reading tasks from being submitted.

closes #862


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0e1e6042
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0e1e6042
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0e1e6042

Branch: refs/heads/master
Commit: 0e1e6042f507ac0da2a21d7de3bbed4c0dac549a
Parents: 0b830ef
Author: Kunal Khatua <kk...@maprtech.com>
Authored: Wed Jun 28 17:39:08 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Mon Jul 3 18:02:11 2017 -0700

----------------------------------------------------------------------
 .../parquet/columnreaders/AsyncPageReader.java  | 21 +++++++++++++-------
 .../parquet/columnreaders/BatchReader.java      |  4 +++-
 .../parquet/columnreaders/ColumnReader.java     | 11 +++++++---
 .../columnreaders/VarLenBinaryReader.java       |  4 +++-
 4 files changed, 28 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/0e1e6042/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index 926436c..8c89e3a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -130,7 +130,10 @@ class AsyncPageReader extends PageReader {
 
   @Override protected void init() throws IOException {
     super.init();
-    asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+    //Avoid Init if a shutdown is already in progress even if init() is called once
+    if (!parentColumnReader.isShuttingDown) {
+      asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+    }
   }
 
   private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
@@ -224,7 +227,7 @@ class AsyncPageReader extends PageReader {
         }
         //if the queue was full before we took a page out, then there would
         // have been no new read tasks scheduled. In that case, schedule a new read.
-        if (pageQueueFull) {
+        if (!parentColumnReader.isShuttingDown && pageQueueFull) {
           asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
         }
       }
@@ -256,7 +259,7 @@ class AsyncPageReader extends PageReader {
             }
             //if the queue was full before we took a page out, then there would
             // have been no new read tasks scheduled. In that case, schedule a new read.
-            if (pageQueueFull) {
+            if (!parentColumnReader.isShuttingDown && pageQueueFull) {
               asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
             }
           }
@@ -278,6 +281,7 @@ class AsyncPageReader extends PageReader {
   }
 
   @Override public void clear() {
+    //Cancelling all existing AsyncPageReaderTasks
     while (asyncPageRead != null && !asyncPageRead.isEmpty()) {
       try {
         Future<Void> f = asyncPageRead.poll();
@@ -298,12 +302,13 @@ class AsyncPageReader extends PageReader {
     while (!pageQueue.isEmpty()) {
       r = null;
       try {
-        r = pageQueue.take();
+        r = pageQueue.poll();
         if (r == ReadStatus.EMPTY) {
           break;
         }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
+      } catch (Exception e) {
+        //Reporting because we shouldn't get this
+        logger.error(e.getMessage());
       } finally {
         if (r != null && r.pageData != null) {
           r.pageData.release();
@@ -412,6 +417,7 @@ class AsyncPageReader extends PageReader {
       try {
         PageHeader pageHeader = Util.readPageHeader(parent.dataReader);
         int compressedSize = pageHeader.getCompressed_page_size();
+        if ( parent.parentColumnReader.isShuttingDown ) { return null; } //Opportunity to skip expensive Parquet processing
         pageData = parent.dataReader.getNext(compressedSize);
         bytesRead = compressedSize;
 
@@ -438,7 +444,7 @@ class AsyncPageReader extends PageReader {
           queue.put(readStatus);
           // if the queue is not full, schedule another read task immediately. If it is then the consumer
           // will schedule a new read task as soon as it removes a page from the queue.
-          if (queue.remainingCapacity() > 0) {
+          if (!parentColumnReader.isShuttingDown && queue.remainingCapacity() > 0) {
             asyncPageRead.offer(parent.threadPool.submit(new AsyncPageReaderTask(debugName, queue)));
           }
         }
@@ -454,6 +460,7 @@ class AsyncPageReader extends PageReader {
         }
         parent.handleAndThrowException(e, "Exception occurred while reading from disk.");
       } finally {
+        //Nothing to do if isShuttingDown.
     }
       return null;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/0e1e6042/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
index 651c813..367b226 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
@@ -68,7 +68,9 @@ public abstract class BatchReader {
     ArrayList<Future<Long>> futures = Lists.newArrayList();
     for (ColumnReader<?> crs : readState.getColumnReaders()) {
       Future<Long> f = crs.processPagesAsync(recordsToRead);
-      futures.add(f);
+      if (f != null) {
+        futures.add(f);
+      }
     }
     Exception exception = null;
     for(Future<Long> f: futures){

http://git-wip-us.apache.org/repos/asf/drill/blob/0e1e6042/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index 2d8f556..8b6e7a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -81,6 +81,8 @@ public abstract class ColumnReader<V extends ValueVector> {
   long readStartInBytes = 0, readLength = 0, readLengthInBits = 0, recordsReadInThisIteration = 0;
   private ExecutorService threadPool;
 
+  volatile boolean isShuttingDown; //Indicate to not submit any new AsyncPageReader Tasks during clear()
+
   protected ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
       ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
     this.parentReader = parentReader;
@@ -125,7 +127,7 @@ public abstract class ColumnReader<V extends ValueVector> {
   }
 
   public Future<Long> processPagesAsync(long recordsToReadInThisPass){
-    Future<Long> r = threadPool.submit(new ColumnReaderProcessPagesTask(recordsToReadInThisPass));
+    Future<Long> r = (isShuttingDown ? null : threadPool.submit(new ColumnReaderProcessPagesTask(recordsToReadInThisPass)));
     return r;
   }
 
@@ -143,6 +145,9 @@ public abstract class ColumnReader<V extends ValueVector> {
   }
 
   public void clear() {
+    //State to indicate no more tasks to be scheduled
+    isShuttingDown = true;
+
     valueVec.clear();
     pageReader.clear();
   }
@@ -190,8 +195,8 @@ public abstract class ColumnReader<V extends ValueVector> {
     return checkVectorCapacityReached();
   }
 
-  protected Future<Integer> readRecordsAsync(int recordsToRead){
-    Future<Integer> r = threadPool.submit(new ColumnReaderReadRecordsTask(recordsToRead));
+  protected Future<Integer> readRecordsAsync(int recordsToRead) {
+    Future<Integer> r = (isShuttingDown ? null : threadPool.submit(new ColumnReaderReadRecordsTask(recordsToRead)));
     return r;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/0e1e6042/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
index b598ac8..900348f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
@@ -119,7 +119,9 @@ public class VarLenBinaryReader {
     ArrayList<Future<Integer>> futures = Lists.newArrayList();
     for (VarLengthColumn<?> columnReader : columns) {
       Future<Integer> f = columnReader.readRecordsAsync(columnReader.pageReader.valuesReadyToRead);
-      futures.add(f);
+      if (f != null) {
+        futures.add(f);
+      }
     }
     Exception exception = null;
     for(Future<Integer> f: futures){