You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ti...@apache.org on 2018/06/14 04:11:19 UTC

[drill] branch master updated (2427aa0 -> ac8e698)

This is an automated email from the ASF dual-hosted git repository.

timothyfarkas pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git.


    from 2427aa0  DRILL-6476: Generate explain plan which shows relation between Lateral and the corresponding Unnest.
     new 544a7a0  DRILL-6488 - change instances of "template inline" to just "template"
     new 98dbc3a  DRILL-6474: Don't use TopN when order by and offset are used without a limit specified.
     new ac8e698  DRILL-6353: Upgrade Parquet MR dependencies

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../client/src/include/drill/drillClient.hpp       | 70 ++++++++++----------
 contrib/storage-hive/hive-exec-shade/pom.xml       | 25 +++++--
 exec/java-exec/pom.xml                             | 75 ---------------------
 .../exec/expr/stat/ParquetComparisonPredicate.java | 14 ++--
 .../drill/exec/expr/stat/ParquetIsPredicate.java   |  9 +--
 .../exec/expr/stat/ParquetPredicatesHelper.java    |  4 +-
 .../exec/planner/physical/PushLimitToTopN.java     | 21 ++++--
 .../parquet/AbstractParquetScanBatchCreator.java   | 14 +++-
 .../drill/exec/store/parquet/ColumnDataReader.java |  9 +--
 .../drill/exec/store/parquet/FooterGatherer.java   |  4 +-
 .../parquet/ParquetDirectByteBufferAllocator.java  | 76 ++++++----------------
 .../exec/store/parquet/ParquetRecordWriter.java    | 13 +++-
 .../store/parquet/columnreaders/PageReader.java    | 51 +++++++++------
 .../columnreaders/VarLenBulkPageReader.java        | 17 ++---
 .../columnreaders/VarLenColumnBulkInput.java       |  4 +-
 .../exec/store/parquet/metadata/Metadata.java      | 16 ++++-
 .../filereader/BufferedDirectBufInputStream.java   |  8 +--
 .../exec/util/filereader/DirectBufInputStream.java | 11 +++-
 .../parquet/hadoop/ColumnChunkIncReadStore.java    | 39 ++++++-----
 .../hadoop/ParquetColumnChunkPageWriteStore.java   | 10 +--
 .../physical/impl/limit/TestLimitOperator.java     | 23 +++++++
 .../physical/impl/limit/TestLimitPlanning.java}    | 23 +++----
 .../store/parquet/TestParquetMetadataCache.java    |  4 ++
 .../org/apache/drill/test/DrillTestWrapper.java    | 24 +++++--
 .../java/org/apache/drill/test/TestBuilder.java    | 35 +++++-----
 pom.xml                                            | 40 +++++++++++-
 26 files changed, 326 insertions(+), 313 deletions(-)
 copy exec/java-exec/src/{main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java => test/java/org/apache/drill/exec/physical/impl/limit/TestLimitPlanning.java} (67%)

-- 
To stop receiving notification emails like this one, please contact
timothyfarkas@apache.org.

[drill] 01/03: DRILL-6488 - change instances of "template inline" to just "template"

Posted by ti...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 544a7a0def08882aca63d9a18b60d168dff8b6f7
Author: Patrick Wong <pw...@maprtech.com>
AuthorDate: Mon Jun 11 17:05:19 2018 -0700

    DRILL-6488 - change instances of "template inline" to just "template"
    
    closes #1317
---
 .../client/src/include/drill/drillClient.hpp       | 70 +++++++++++-----------
 1 file changed, 35 insertions(+), 35 deletions(-)

diff --git a/contrib/native/client/src/include/drill/drillClient.hpp b/contrib/native/client/src/include/drill/drillClient.hpp
index a09e666..7eabb50 100644
--- a/contrib/native/client/src/include/drill/drillClient.hpp
+++ b/contrib/native/client/src/include/drill/drillClient.hpp
@@ -201,15 +201,15 @@ namespace meta {
     _DL_INTERVAL_MINUTE_TO_SECOND = 1 << 16L
   };
 
-  template inline _DateTimeLiteralSupport operator&(_DateTimeLiteralSupport __a, _DateTimeLiteralSupport __b);
-  template inline _DateTimeLiteralSupport operator|(_DateTimeLiteralSupport __a, _DateTimeLiteralSupport __b);
-  template inline _DateTimeLiteralSupport operator^(_DateTimeLiteralSupport __a, _DateTimeLiteralSupport __b);
+  template _DateTimeLiteralSupport operator&(_DateTimeLiteralSupport __a, _DateTimeLiteralSupport __b);
+  template _DateTimeLiteralSupport operator|(_DateTimeLiteralSupport __a, _DateTimeLiteralSupport __b);
+  template _DateTimeLiteralSupport operator^(_DateTimeLiteralSupport __a, _DateTimeLiteralSupport __b);
 
-  template inline _DateTimeLiteralSupport& operator&=(_DateTimeLiteralSupport& __a, _DateTimeLiteralSupport __b);
-  template inline _DateTimeLiteralSupport& operator|=(_DateTimeLiteralSupport& __a, _DateTimeLiteralSupport __b);
-  template inline _DateTimeLiteralSupport& operator^=(_DateTimeLiteralSupport& __a, _DateTimeLiteralSupport __b);
+  template _DateTimeLiteralSupport& operator&=(_DateTimeLiteralSupport& __a, _DateTimeLiteralSupport __b);
+  template _DateTimeLiteralSupport& operator|=(_DateTimeLiteralSupport& __a, _DateTimeLiteralSupport __b);
+  template _DateTimeLiteralSupport& operator^=(_DateTimeLiteralSupport& __a, _DateTimeLiteralSupport __b);
 
-  template inline _DateTimeLiteralSupport operator~(_DateTimeLiteralSupport __a);
+  template _DateTimeLiteralSupport operator~(_DateTimeLiteralSupport __a);
 
   /**
    * Date time literal support flags
@@ -259,15 +259,15 @@ namespace meta {
       _C_GROUPBY    = 1 << 1L
   };
 
-  template inline _CollateSupport operator&(_CollateSupport __a, _CollateSupport __b);
-  template inline _CollateSupport operator|(_CollateSupport __a, _CollateSupport __b);
-  template inline _CollateSupport operator^(_CollateSupport __a, _CollateSupport __b);
+  template _CollateSupport operator&(_CollateSupport __a, _CollateSupport __b);
+  template _CollateSupport operator|(_CollateSupport __a, _CollateSupport __b);
+  template _CollateSupport operator^(_CollateSupport __a, _CollateSupport __b);
 
-  template inline _CollateSupport& operator&=(_CollateSupport& __a, _CollateSupport __b);
-  template inline _CollateSupport& operator|=(_CollateSupport& __a, _CollateSupport __b);
-  template inline _CollateSupport& operator^=(_CollateSupport& __a, _CollateSupport __b);
+  template _CollateSupport& operator&=(_CollateSupport& __a, _CollateSupport __b);
+  template _CollateSupport& operator|=(_CollateSupport& __a, _CollateSupport __b);
+  template _CollateSupport& operator^=(_CollateSupport& __a, _CollateSupport __b);
 
-  template inline _CollateSupport operator~(_CollateSupport __a);
+  template _CollateSupport operator~(_CollateSupport __a);
 
 
   /**
@@ -339,15 +339,15 @@ namespace meta {
       _OJ_ALL_COMPARISON_OPS    = 1 << 7L //!< _OJ_ALL_COMPARISON_OPS
   };
 
-  template inline _OuterJoinSupport operator&(_OuterJoinSupport __a, _OuterJoinSupport __b);
-  template inline _OuterJoinSupport operator|(_OuterJoinSupport __a, _OuterJoinSupport __b);
-  template inline _OuterJoinSupport operator^(_OuterJoinSupport __a, _OuterJoinSupport __b);
+  template _OuterJoinSupport operator&(_OuterJoinSupport __a, _OuterJoinSupport __b);
+  template _OuterJoinSupport operator|(_OuterJoinSupport __a, _OuterJoinSupport __b);
+  template _OuterJoinSupport operator^(_OuterJoinSupport __a, _OuterJoinSupport __b);
 
-  template inline _OuterJoinSupport& operator&=(_OuterJoinSupport& __a, _OuterJoinSupport __b);
-  template inline _OuterJoinSupport& operator|=(_OuterJoinSupport& __a, _OuterJoinSupport __b);
-  template inline _OuterJoinSupport& operator^=(_OuterJoinSupport& __a, _OuterJoinSupport __b);
+  template _OuterJoinSupport& operator&=(_OuterJoinSupport& __a, _OuterJoinSupport __b);
+  template _OuterJoinSupport& operator|=(_OuterJoinSupport& __a, _OuterJoinSupport __b);
+  template _OuterJoinSupport& operator^=(_OuterJoinSupport& __a, _OuterJoinSupport __b);
 
-  template inline _OuterJoinSupport operator~(_OuterJoinSupport __a);
+  template _OuterJoinSupport operator~(_OuterJoinSupport __a);
 
   /**
    * Outer join support flags
@@ -404,15 +404,15 @@ namespace meta {
       _SQ_IN_QUANTIFIED = 1 << 5L
   };
 
-  template inline _SubQuerySupport operator&(_SubQuerySupport __a, _SubQuerySupport __b);
-  template inline _SubQuerySupport operator|(_SubQuerySupport __a, _SubQuerySupport __b);
-  template inline _SubQuerySupport operator^(_SubQuerySupport __a, _SubQuerySupport __b);
+  template _SubQuerySupport operator&(_SubQuerySupport __a, _SubQuerySupport __b);
+  template _SubQuerySupport operator|(_SubQuerySupport __a, _SubQuerySupport __b);
+  template _SubQuerySupport operator^(_SubQuerySupport __a, _SubQuerySupport __b);
 
-  template inline _SubQuerySupport& operator&=(_SubQuerySupport& __a, _SubQuerySupport __b);
-  template inline _SubQuerySupport& operator|=(_SubQuerySupport& __a, _SubQuerySupport __b);
-  template inline _SubQuerySupport& operator^=(_SubQuerySupport& __a, _SubQuerySupport __b);
+  template _SubQuerySupport& operator&=(_SubQuerySupport& __a, _SubQuerySupport __b);
+  template _SubQuerySupport& operator|=(_SubQuerySupport& __a, _SubQuerySupport __b);
+  template _SubQuerySupport& operator^=(_SubQuerySupport& __a, _SubQuerySupport __b);
 
-  template inline _SubQuerySupport operator~(_SubQuerySupport __a);
+  template _SubQuerySupport operator~(_SubQuerySupport __a);
 
   /**
    * SubQuery support flags
@@ -442,15 +442,15 @@ namespace meta {
       _U_UNION_ALL  = 1 << 2L //!< _U_UNION_ALL
   };
 
-  template inline _UnionSupport operator&(_UnionSupport __a, _UnionSupport __b);
-  template inline _UnionSupport operator|(_UnionSupport __a, _UnionSupport __b);
-  template inline _UnionSupport operator^(_UnionSupport __a, _UnionSupport __b);
+  template _UnionSupport operator&(_UnionSupport __a, _UnionSupport __b);
+  template _UnionSupport operator|(_UnionSupport __a, _UnionSupport __b);
+  template _UnionSupport operator^(_UnionSupport __a, _UnionSupport __b);
 
-  template inline _UnionSupport& operator&=(_UnionSupport& __a, _UnionSupport __b);
-  template inline _UnionSupport& operator|=(_UnionSupport& __a, _UnionSupport __b);
-  template inline _UnionSupport& operator^=(_UnionSupport& __a, _UnionSupport __b);
+  template _UnionSupport& operator&=(_UnionSupport& __a, _UnionSupport __b);
+  template _UnionSupport& operator|=(_UnionSupport& __a, _UnionSupport __b);
+  template _UnionSupport& operator^=(_UnionSupport& __a, _UnionSupport __b);
 
-  template inline _UnionSupport operator~(_UnionSupport __a);
+  template _UnionSupport operator~(_UnionSupport __a);
 
   /**
    * Union support flags

-- 
To stop receiving notification emails like this one, please contact
timothyfarkas@apache.org.

[drill] 03/03: DRILL-6353: Upgrade Parquet MR dependencies

Posted by ti...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit ac8e69847659582e36c89fd52bb0856ab3bfbd21
Author: Vlad Rozov <vr...@apache.org>
AuthorDate: Wed May 9 13:24:11 2018 -0700

    DRILL-6353: Upgrade Parquet MR dependencies
    
    closes #1259
---
 contrib/storage-hive/hive-exec-shade/pom.xml       | 25 +++++--
 exec/java-exec/pom.xml                             | 75 ---------------------
 .../exec/expr/stat/ParquetComparisonPredicate.java | 14 ++--
 .../drill/exec/expr/stat/ParquetIsPredicate.java   |  9 +--
 .../exec/expr/stat/ParquetPredicatesHelper.java    |  4 +-
 .../parquet/AbstractParquetScanBatchCreator.java   | 14 +++-
 .../drill/exec/store/parquet/ColumnDataReader.java |  9 +--
 .../drill/exec/store/parquet/FooterGatherer.java   |  4 +-
 .../parquet/ParquetDirectByteBufferAllocator.java  | 76 ++++++----------------
 .../exec/store/parquet/ParquetRecordWriter.java    | 13 +++-
 .../store/parquet/columnreaders/PageReader.java    | 51 +++++++++------
 .../columnreaders/VarLenBulkPageReader.java        | 17 ++---
 .../columnreaders/VarLenColumnBulkInput.java       |  4 +-
 .../exec/store/parquet/metadata/Metadata.java      | 16 ++++-
 .../filereader/BufferedDirectBufInputStream.java   |  8 +--
 .../exec/util/filereader/DirectBufInputStream.java | 11 +++-
 .../parquet/hadoop/ColumnChunkIncReadStore.java    | 39 ++++++-----
 .../hadoop/ParquetColumnChunkPageWriteStore.java   | 10 +--
 .../store/parquet/TestParquetMetadataCache.java    |  4 ++
 pom.xml                                            | 40 +++++++++++-
 20 files changed, 209 insertions(+), 234 deletions(-)

diff --git a/contrib/storage-hive/hive-exec-shade/pom.xml b/contrib/storage-hive/hive-exec-shade/pom.xml
index 6f511ad..98fd4b8 100644
--- a/contrib/storage-hive/hive-exec-shade/pom.xml
+++ b/contrib/storage-hive/hive-exec-shade/pom.xml
@@ -31,6 +31,20 @@
   <packaging>jar</packaging>
   <name>contrib/hive-storage-plugin/hive-exec-shaded</name>
 
+  <properties>
+    <hive.parquet.version>1.8.3</hive.parquet.version>
+  </properties>
+
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>org.apache.parquet</groupId>
+        <artifactId>parquet-hadoop-bundle</artifactId>
+        <version>${hive.parquet.version}</version>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.hive</groupId>
@@ -68,11 +82,6 @@
         </exclusion>
       </exclusions>
     </dependency>
-    <!--Once newer hive-exec version leverages parquet-column 1.9.0, this dependency can be deleted -->
-    <dependency>
-      <groupId>org.apache.parquet</groupId>
-      <artifactId>parquet-column</artifactId>
-    </dependency>
   </dependencies>
 
   <build>
@@ -83,7 +92,7 @@
           <artifactSet>
             <includes>
               <include>org.apache.hive:hive-exec</include>
-              <include>org.apache.parquet:parquet-column</include>
+              <include>org.apache.parquet:parquet-hadoop-bundle</include>
               <include>commons-codec:commons-codec</include>
               <include>com.fasterxml.jackson.core:jackson-databind</include>
               <include>com.fasterxml.jackson.core:jackson-annotations</include>
@@ -118,6 +127,10 @@
               <shadedPattern>hive.org.apache.parquet.</shadedPattern>
             </relocation>
             <relocation>
+              <pattern>shaded.parquet.</pattern>
+              <shadedPattern>hive.shaded.parquet.</shadedPattern>
+            </relocation>
+            <relocation>
               <pattern>org.apache.avro.</pattern>
               <shadedPattern>hive.org.apache.avro.</shadedPattern>
             </relocation>
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 2205c2f..7701e76 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -249,92 +249,17 @@
     <dependency>
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-hadoop</artifactId>
-      <version>${parquet.version}</version>
       <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-format</artifactId>
-      <version>2.3.0-incubating</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-common</artifactId>
       <version>${parquet.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.parquet</groupId>
-      <artifactId>parquet-jackson</artifactId>
-      <version>${parquet.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.parquet</groupId>
-      <artifactId>parquet-encoding</artifactId>
-      <version>${parquet.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.parquet</groupId>
-      <artifactId>parquet-generator</artifactId>
-      <version>${parquet.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>javax.inject</groupId>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
index 9e561ad..ebceefb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
@@ -113,7 +113,7 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical
       // can drop when left's max < right's min, or right's max < left's min
       final C leftMin = leftStat.genericGetMin();
       final C rightMin = rightStat.genericGetMin();
-      return leftStat.genericGetMax().compareTo(rightMin) < 0 || rightStat.genericGetMax().compareTo(leftMin) < 0;
+      return (leftStat.compareMaxToValue(rightMin) < 0) || (rightStat.compareMaxToValue(leftMin) < 0);
     }) {
       @Override
       public String toString() {
@@ -132,7 +132,7 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical
     return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
       // can drop when left's max <= right's min.
       final C rightMin = rightStat.genericGetMin();
-      return leftStat.genericGetMax().compareTo(rightMin) <= 0;
+      return leftStat.compareMaxToValue(rightMin) <= 0;
     });
   }
 
@@ -146,7 +146,7 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical
     return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
       // can drop when left's max < right's min.
       final C rightMin = rightStat.genericGetMin();
-      return leftStat.genericGetMax().compareTo(rightMin) < 0;
+      return leftStat.compareMaxToValue(rightMin) < 0;
     });
   }
 
@@ -160,7 +160,7 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical
     return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
       // can drop when right's max <= left's min.
       final C leftMin = leftStat.genericGetMin();
-      return rightStat.genericGetMax().compareTo(leftMin) <= 0;
+      return rightStat.compareMaxToValue(leftMin) <= 0;
     });
   }
 
@@ -173,7 +173,7 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical
     return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
       // can drop when right's max < left's min.
       final C leftMin = leftStat.genericGetMin();
-      return rightStat.genericGetMax().compareTo(leftMin) < 0;
+      return rightStat.compareMaxToValue(leftMin) < 0;
     });
   }
 
@@ -188,8 +188,8 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical
       // can drop when there is only one unique value.
       final C leftMax = leftStat.genericGetMax();
       final C rightMax = rightStat.genericGetMax();
-      return leftStat.genericGetMin().compareTo(leftMax) == 0 && rightStat.genericGetMin().compareTo(rightMax) == 0 &&
-          leftStat.genericGetMax().compareTo(rightMax) == 0;
+      return leftStat.compareMinToValue(leftMax) == 0 && rightStat.compareMinToValue(rightMax) == 0 &&
+          leftStat.compareMaxToValue(rightMax) == 0;
     });
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java
index 9b04102..547dc06 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java
@@ -23,6 +23,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.expression.TypedFieldExpr;
 import org.apache.drill.common.expression.visitors.ExprVisitor;
 import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.parquet.column.statistics.BooleanStatistics;
 import org.apache.parquet.column.statistics.Statistics;
 
 import java.util.ArrayList;
@@ -114,7 +115,7 @@ public class ParquetIsPredicate<C extends Comparable<C>> extends LogicalExpressi
   private static LogicalExpression createIsTruePredicate(LogicalExpression expr) {
     return new ParquetIsPredicate<Boolean>(expr,
         //if max value is not true or if there are all nulls  -> canDrop
-        (exprStat, evaluator) -> !exprStat.genericGetMax().equals(Boolean.TRUE) || isAllNulls(exprStat, evaluator.getRowCount())
+        (exprStat, evaluator) -> !((BooleanStatistics)exprStat).getMax() || isAllNulls(exprStat, evaluator.getRowCount())
     );
   }
 
@@ -124,7 +125,7 @@ public class ParquetIsPredicate<C extends Comparable<C>> extends LogicalExpressi
   private static LogicalExpression createIsFalsePredicate(LogicalExpression expr) {
     return new ParquetIsPredicate<Boolean>(expr,
         //if min value is not false or if there are all nulls  -> canDrop
-        (exprStat, evaluator) -> !exprStat.genericGetMin().equals(Boolean.FALSE) || isAllNulls(exprStat, evaluator.getRowCount())
+        (exprStat, evaluator) -> ((BooleanStatistics)exprStat).getMin() || isAllNulls(exprStat, evaluator.getRowCount())
     );
   }
 
@@ -134,7 +135,7 @@ public class ParquetIsPredicate<C extends Comparable<C>> extends LogicalExpressi
   private static LogicalExpression createIsNotTruePredicate(LogicalExpression expr) {
     return new ParquetIsPredicate<Boolean>(expr,
         //if min value is not false or if there are no nulls  -> canDrop
-        (exprStat, evaluator) -> !exprStat.genericGetMin().equals(Boolean.FALSE) && hasNoNulls(exprStat)
+        (exprStat, evaluator) -> ((BooleanStatistics)exprStat).getMin() && hasNoNulls(exprStat)
     );
   }
 
@@ -144,7 +145,7 @@ public class ParquetIsPredicate<C extends Comparable<C>> extends LogicalExpressi
   private static LogicalExpression createIsNotFalsePredicate(LogicalExpression expr) {
     return new ParquetIsPredicate<Boolean>(expr,
         //if max value is not true or if there are no nulls  -> canDrop
-        (exprStat, evaluator) -> !exprStat.genericGetMax().equals(Boolean.TRUE) && hasNoNulls(exprStat)
+        (exprStat, evaluator) -> !((BooleanStatistics)exprStat).getMax() && hasNoNulls(exprStat)
     );
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
index 7ff1036..f804a7b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
@@ -43,7 +43,7 @@ class ParquetPredicatesHelper {
    *          False if at least one row is not null.
    */
   static boolean isAllNulls(Statistics stat, long rowCount) {
-    return stat.getNumNulls() == rowCount;
+    return stat.isNumNullsSet() && stat.getNumNulls() == rowCount;
   }
 
   /**
@@ -54,7 +54,7 @@ class ParquetPredicatesHelper {
    *          False if the parquet file hasn't nulls.
    */
   static boolean hasNoNulls(Statistics stat) {
-    return stat.getNumNulls() == 0;
+    return !stat.isNumNullsSet() || stat.getNumNulls() == 0;
   }
 
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index 6a320b8..dc09ce1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -33,11 +33,12 @@ import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet2.DrillParquetReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
 
@@ -50,6 +51,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.drill.exec.store.parquet.metadata.Metadata.PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+
 public abstract class AbstractParquetScanBatchCreator {
 
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class);
@@ -146,11 +150,15 @@ public abstract class AbstractParquetScanBatchCreator {
   protected abstract AbstractDrillFileSystemManager getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager optionManager);
 
   private ParquetMetadata readFooter(Configuration conf, String path) throws IOException {
-    Configuration newConf = new Configuration(conf);
+    conf = new Configuration(conf);
     conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
     conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
     conf.setBoolean(ENABLE_TIME_READ_COUNTER, false);
-    return ParquetFileReader.readFooter(newConf, new Path(path), ParquetMetadataConverter.NO_FILTER);
+    conf.setBoolean(PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED, true);
+    ParquetReadOptions options = ParquetReadOptions.builder().withMetadataFilter(NO_FILTER).build();
+    try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(path), conf), options)) {
+      return reader.getFooter();
+    }
   }
 
   private boolean isComplex(ParquetMetadata footer) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
index dcd40cf..79294da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
@@ -21,14 +21,13 @@ import io.netty.buffer.DrillBuf;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.format.PageHeader;
 import org.apache.parquet.format.Util;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
 
 public class ColumnDataReader {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnDataReader.class);
@@ -58,11 +57,7 @@ public class ColumnDataReader {
 
   public void loadPage(DrillBuf target, int pageLength) throws IOException {
     target.clear();
-    ByteBuffer directBuffer = target.nioBuffer(0, pageLength);
-    int lengthLeftToRead = pageLength;
-    while (lengthLeftToRead > 0) {
-      lengthLeftToRead -= CompatibilityUtil.getBuf(input, directBuffer, lengthLeftToRead);
-    }
+    HadoopStreams.wrap(input).read(target.nioBuffer(0, pageLength));
     target.writerIndex(pageLength);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
index ea34c7d..d1562c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
@@ -42,6 +42,7 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import com.google.common.base.Preconditions;
 
 import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
 
 public class FooterGatherer {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FooterGatherer.class);
@@ -160,7 +161,8 @@ public class FooterGatherer {
         footerBytes = ArrayUtils.subarray(footerBytes, start, start + size);
       }
 
-      ParquetMetadata metadata = ParquetFormatPlugin.parquetMetadataConverter.readParquetMetadata(new ByteArrayInputStream(footerBytes));
+      final ByteArrayInputStream from = new ByteArrayInputStream(footerBytes);
+      ParquetMetadata metadata = ParquetFormatPlugin.parquetMetadataConverter.readParquetMetadata(from, NO_FILTER);
       Footer footer = new Footer(status.getPath(), metadata);
       return footer;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
index 09f1b26..ba6aac9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
@@ -17,10 +17,11 @@
  */
 package org.apache.drill.exec.store.parquet;
 
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
 
 import java.nio.ByteBuffer;
-import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.Map;
 
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.OperatorContext;
@@ -30,44 +31,42 @@ import org.apache.parquet.bytes.ByteBufferAllocator;
 /**
  * {@link ByteBufferAllocator} implementation that uses Drill's {@link BufferAllocator} to allocate and release
  * {@link ByteBuffer} objects.<br>
- * To properly release an allocated {@link ByteBuf}, this class keeps track of it's corresponding {@link ByteBuffer}
+ * To properly release an allocated {@link DrillBuf}, this class keeps track of it's corresponding {@link ByteBuffer}
  * that was passed to the Parquet library.
  */
 public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetDirectByteBufferAllocator.class);
 
   private final BufferAllocator allocator;
-  private final HashMap<Key, ByteBuf> allocatedBuffers = new HashMap<>();
+  private final Map<ByteBuffer, DrillBuf> allocatedBuffers = new IdentityHashMap<>();
 
-  public ParquetDirectByteBufferAllocator(OperatorContext o){
-    allocator = o.getAllocator();
+  public ParquetDirectByteBufferAllocator(OperatorContext o) {
+    this(o.getAllocator());
   }
 
   public ParquetDirectByteBufferAllocator(BufferAllocator allocator) {
     this.allocator = allocator;
   }
 
-
   @Override
   public ByteBuffer allocate(int sz) {
-    ByteBuf bb = allocator.buffer(sz);
-    ByteBuffer b = bb.nioBuffer(0, sz);
-    final Key key = new Key(b);
-    allocatedBuffers.put(key, bb);
-    logger.debug("ParquetDirectByteBufferAllocator: Allocated {} bytes. Allocated ByteBuffer id: {}", sz, key.hash);
-    return b;
+    DrillBuf drillBuf = allocator.buffer(sz);
+    ByteBuffer byteBuffer = drillBuf.nioBuffer(0, sz);
+    allocatedBuffers.put(byteBuffer, drillBuf);
+    logger.debug("{}: Allocated {} bytes. Allocated DrillBuf with id {} and ByteBuffer {}", this, sz, drillBuf.getId(), System.identityHashCode(byteBuffer));
+    return byteBuffer;
   }
 
   @Override
-  public void release(ByteBuffer b) {
-    final Key key = new Key(b);
-    final ByteBuf bb = allocatedBuffers.get(key);
+  public void release(ByteBuffer byteBuffer) {
+    final DrillBuf drillBuf = allocatedBuffers.remove(byteBuffer);
     // The ByteBuffer passed in may already have been freed or not allocated by this allocator.
     // If it is not found in the allocated buffers, do nothing
-    if(bb != null) {
-      logger.debug("ParquetDirectByteBufferAllocator: Freed byte buffer. Allocated ByteBuffer id: {}", key.hash);
-      bb.release();
-      allocatedBuffers.remove(key);
+    if (drillBuf != null) {
+      logger.debug("{}: Freed DrillBuf with id {} and ByteBuffer {}", this, drillBuf.getId(), System.identityHashCode(byteBuffer));
+      drillBuf.release();
+    } else {
+      logger.warn("{}: ByteBuffer {} is not present", this, System.identityHashCode(byteBuffer));
     }
   }
 
@@ -75,41 +74,4 @@ public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator {
   public boolean isDirect() {
     return true;
   }
-
-  /**
-   * ByteBuffer wrapper that computes a fixed hashcode.
-   * <br><br>
-   * Parquet only handles {@link ByteBuffer} objects, so we need to use them as keys to keep track of their corresponding
-   * {@link ByteBuf}, but {@link ByteBuffer} is mutable and it can't be used as a {@link HashMap} key as it is.<br>
-   * This class solves this by providing a fixed hashcode for {@link ByteBuffer} and uses reference equality in case
-   * of collisions (we don't need to compare the content of {@link ByteBuffer} because the object passed to
-   * {@link #release(ByteBuffer)} will be the same object returned from a previous {@link #allocate(int)}.
-   */
-  private class Key {
-    final int hash;
-    final ByteBuffer buffer;
-
-    Key(final ByteBuffer buffer) {
-      this.buffer = buffer;
-      // remember, we can't use buffer.hashCode()
-      this.hash = System.identityHashCode(buffer);
-    }
-
-    @Override
-    public int hashCode() {
-      return hash;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) {
-        return true;
-      }
-      if (!(obj instanceof Key)) {
-        return false;
-      }
-      final Key key = (Key) obj;
-      return hash == key.hash && buffer == key.buffer;
-    }
-  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 0e40c9e..0917926 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -54,8 +54,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
 import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.column.impl.ColumnWriteStoreV1;
+import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.ParquetColumnChunkPageWriteStore;
 import org.apache.parquet.hadoop.ParquetFileWriter;
@@ -241,8 +243,15 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     // once PARQUET-1006 will be resolved
     pageStore = new ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema, initialSlabSize,
         pageSize, new ParquetDirectByteBufferAllocator(oContext));
-    store = new ColumnWriteStoreV1(pageStore, pageSize, initialPageBufferSize, enableDictionary,
-        writerVersion, new ParquetDirectByteBufferAllocator(oContext));
+    ParquetProperties parquetProperties = ParquetProperties.builder()
+        .withPageSize(pageSize)
+        .withDictionaryEncoding(enableDictionary)
+        .withDictionaryPageSize(initialPageBufferSize)
+        .withWriterVersion(writerVersion)
+        .withAllocator(new ParquetDirectByteBufferAllocator(oContext))
+        .withValuesWriterFactory(new DefaultV1ValuesWriterFactory())
+        .build();
+    store = new ColumnWriteStoreV1(pageStore, parquetProperties);
     MessageColumnIO columnIO = new ColumnIOFactory(false).getColumnIO(this.schema);
     consumer = columnIO.getRecordWriter(store);
     setUp(schema, consumer);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index bf75695..01d0644 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import io.netty.buffer.ByteBufUtil;
 import org.apache.drill.exec.util.filereader.BufferedDirectBufInputStream;
@@ -30,6 +31,7 @@ import org.apache.drill.exec.util.filereader.DirectBufInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.Dictionary;
 import org.apache.parquet.column.Encoding;
@@ -250,7 +252,7 @@ class PageReader {
   }
 
   public static BytesInput asBytesInput(DrillBuf buf, int offset, int length) throws IOException {
-    return BytesInput.from(buf.nioBuffer(offset, length), 0, length);
+    return BytesInput.from(buf.nioBuffer(offset, length));
   }
 
 
@@ -319,41 +321,44 @@ class PageReader {
 
     byteLength = pageHeader.uncompressed_page_size;
 
-    final ByteBuffer pageDataBuffer = pageData.nioBuffer(0, pageData.capacity());
+    final ByteBufferInputStream in = ByteBufferInputStream.wrap(pageData.nioBuffer(0, pageData.capacity()));
 
     readPosInBytes = 0;
     if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) {
       repetitionLevels = rlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.REPETITION_LEVEL);
-      repetitionLevels.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
+      repetitionLevels.initFromPage(currentPageCount, in);
       // we know that the first value will be a 0, at the end of each list of repeated values we will hit another 0 indicating
       // a new record, although we don't know the length until we hit it (and this is a one way stream of integers) so we
       // read the first zero here to simplify the reading processes, and start reading the first value the same as all
       // of the rest. Effectively we are 'reading' the non-existent value in front of the first allowing direct access to
       // the first list of repetition levels
-      readPosInBytes = repetitionLevels.getNextOffset();
+      readPosInBytes = in.position();
       repetitionLevels.readInteger();
     }
-    if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){
+    if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0) {
       parentColumnReader.currDefLevel = -1;
       definitionLevels = dlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
-      definitionLevels.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
-      readPosInBytes = definitionLevels.getNextOffset();
+      definitionLevels.initFromPage(currentPageCount, in);
+      readPosInBytes = in.position();
       if (!valueEncoding.usesDictionary()) {
         valueReader = valueEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
-        valueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
+        valueReader.initFromPage(currentPageCount, in);
       }
     }
-    if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
+    if (valueReader == null && parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
       valueReader = valueEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
-      valueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
+      valueReader.initFromPage(currentPageCount, in);
     }
     if (valueEncoding.usesDictionary()) {
       // initialize two of the dictionary readers, one is for determining the lengths of each value, the second is for
       // actually copying the values out into the vectors
+      Preconditions.checkState(readPosInBytes < pageData.capacity());
+      int index = (int)readPosInBytes;
+      ByteBuffer byteBuffer = pageData.nioBuffer(index, pageData.capacity() - index);
       dictionaryLengthDeterminingReader = new DictionaryValuesReader(dictionary);
-      dictionaryLengthDeterminingReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
+      dictionaryLengthDeterminingReader.initFromPage(currentPageCount, ByteBufferInputStream.wrap(byteBuffer));
       dictionaryValueReader = new DictionaryValuesReader(dictionary);
-      dictionaryValueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
+      dictionaryValueReader.initFromPage(currentPageCount, ByteBufferInputStream.wrap(byteBuffer));
       parentColumnReader.usingDictionary = true;
     } else {
       parentColumnReader.usingDictionary = false;
@@ -445,25 +450,29 @@ class PageReader {
    * @throws IOException An IO related condition
    */
   void resetDefinitionLevelReader(int skipCount) throws IOException {
-    if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0) {
-      throw new UnsupportedOperationException("Unsupoorted Operation");
-    }
+    Preconditions.checkState(parentColumnReader.columnDescriptor.getMaxDefinitionLevel() == 1);
+    Preconditions.checkState(currentPageCount > 0);
 
+    final Encoding rlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding);
     final Encoding dlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.definition_level_encoding);
-    final ByteBuffer pageDataBuffer = pageData.nioBuffer(0, pageData.capacity());
-    final int defStartPos = repetitionLevels != null ? repetitionLevels.getNextOffset() : 0;
+
+    final ByteBufferInputStream in = ByteBufferInputStream.wrap(pageData.nioBuffer(0, pageData.capacity()));
+
+    if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) {
+      repetitionLevels = rlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.REPETITION_LEVEL);
+      repetitionLevels.initFromPage(currentPageCount, in);
+      repetitionLevels.readInteger();
+    }
+
     definitionLevels = dlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
     parentColumnReader.currDefLevel = -1;
 
     // Now reinitialize the underlying decoder
-    assert currentPageCount > 0 : "Page count should be strictly upper than zero";
-    definitionLevels.initFromPage(currentPageCount, pageDataBuffer, defStartPos);
+    definitionLevels.initFromPage(currentPageCount, in);
 
     // Skip values if requested by caller
     for (int idx = 0; idx < skipCount; ++idx) {
       definitionLevels.skip();
     }
   }
-
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
index b6205c1..385cb83 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
@@ -66,12 +66,7 @@ final class VarLenBulkPageReader {
     this.buffer.order(ByteOrder.nativeOrder());
 
     if (pageInfoInput != null) {
-      this.pageInfo.pageData = pageInfoInput.pageData;
-      this.pageInfo.pageDataOff = pageInfoInput.pageDataOff;
-      this.pageInfo.pageDataLen = pageInfoInput.pageDataLen;
-      this.pageInfo.numPageFieldsRead = pageInfoInput.numPageFieldsRead;
-      this.pageInfo.definitionLevels = pageInfoInput.definitionLevels;
-      this.pageInfo.dictionaryValueReader = pageInfoInput.dictionaryValueReader;
+      set(pageInfoInput, false);
     }
 
     this.columnPrecInfo = columnPrecInfoInput;
@@ -87,15 +82,17 @@ final class VarLenBulkPageReader {
     nullableDictionaryReader = new VarLenNullableDictionaryReader(buffer, pageInfo, columnPrecInfo, entry);
   }
 
-  final void set(PageDataInfo pageInfoInput) {
+  final void set(PageDataInfo pageInfoInput, boolean clear) {
     pageInfo.pageData = pageInfoInput.pageData;
     pageInfo.pageDataOff = pageInfoInput.pageDataOff;
     pageInfo.pageDataLen = pageInfoInput.pageDataLen;
     pageInfo.numPageFieldsRead = pageInfoInput.numPageFieldsRead;
     pageInfo.definitionLevels = pageInfoInput.definitionLevels;
     pageInfo.dictionaryValueReader = pageInfoInput.dictionaryValueReader;
-
-    buffer.clear();
+    pageInfo.numPageValues = pageInfoInput.numPageValues;
+    if (clear) {
+      buffer.clear();
+    }
   }
 
   final VarLenColumnBulkEntry getEntry(int valuesToRead) {
@@ -160,4 +157,4 @@ final class VarLenBulkPageReader {
     }
   }
 
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
index 8daf2cc..1b30737 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
@@ -204,7 +204,7 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
         buffPagePayload = new VarLenBulkPageReader(pageInfo, columnPrecInfo, callback);
 
       } else {
-        buffPagePayload.set(pageInfo);
+        buffPagePayload.set(pageInfo, true);
       }
     } else {
       if (buffPagePayload == null) {
@@ -567,4 +567,4 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
   }
 
 
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index ab655e9..a61cc18 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -39,17 +39,19 @@ import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.OriginalType;
@@ -87,6 +89,7 @@ public class Metadata {
   public static final String[] OLD_METADATA_FILENAMES = {".drill.parquet_metadata.v2"};
   public static final String METADATA_FILENAME = ".drill.parquet_metadata";
   public static final String METADATA_DIRECTORIES_FILENAME = ".drill.parquet_metadata_directories";
+  public static final String PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED = "parquet.strings.signed-min-max.enabled";
 
   private final ParquetFormatConfig formatConfig;
 
@@ -409,9 +412,16 @@ public class Metadata {
       final FileStatus file, final FileSystem fs) throws IOException, InterruptedException {
     final ParquetMetadata metadata;
     final UserGroupInformation processUserUgi = ImpersonationUtil.getProcessUserUGI();
+    final Configuration conf = new Configuration(fs.getConf());
+    final ParquetReadOptions parquetReadOptions = ParquetReadOptions.builder()
+        .useSignedStringMinMax(true)
+        .build();
     try {
-      metadata = processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>)
-          () -> ParquetFileReader.readFooter(fs.getConf(), file, ParquetMetadataConverter.NO_FILTER));
+      metadata = processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>)() -> {
+        try (ParquetFileReader parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromStatus(file, conf), parquetReadOptions)) {
+          return parquetFileReader.getFooter();
+        }
+      });
     } catch(Exception e) {
       logger.error("Exception while reading footer of parquet file [Details - path: {}, owner: {}] as process user {}",
         file.getPath(), file.getOwner(), processUserUgi.getShortUserName(), e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
index f208d6e..1d764b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
@@ -21,7 +21,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import io.netty.buffer.DrillBuf;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -179,7 +179,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
     int nBytes = 0;
     if (bytesToRead > 0) {
       try {
-        nBytes = CompatibilityUtil.getBuf(getInputStream(), directBuffer, bytesToRead);
+        nBytes = HadoopStreams.wrap(getInputStream()).read(directBuffer);
       } catch (Exception e) {
         logger.error("Error reading from stream {}. Error was : {}", this.streamId, e.getMessage());
         throw new IOException((e));
@@ -193,8 +193,8 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
           logger.trace(
               "PERF: Disk read complete. {}, StartOffset: {}, TotalByteSize: {}, BufferSize: {}, BytesRead: {}, Count: {}, "
                   + "CurPosInStream: {}, CurPosInBuffer: {}, Time: {} ms", this.streamId, this.startOffset,
-              this.totalByteSize, this.bufSize, bytesRead, this.count, this.curPosInStream, this.curPosInBuffer, ((double) timer.elapsed(TimeUnit.MICROSECONDS))
-                  / 1000);
+              this.totalByteSize, this.bufSize, bytesRead, this.count, this.curPosInStream, this.curPosInBuffer,
+              ((double) timer.elapsed(TimeUnit.MICROSECONDS)) / 1000);
         }
       }
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
index ae09a37..ea2542e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
@@ -23,7 +23,8 @@ import io.netty.buffer.DrillBuf;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.io.SeekableInputStream;
 
 import java.io.FilterInputStream;
 import java.io.IOException;
@@ -86,12 +87,16 @@ public class DirectBufInputStream extends FilterInputStream {
     buf.clear();
     ByteBuffer directBuffer = buf.nioBuffer(0, len);
     int lengthLeftToRead = len;
+    SeekableInputStream seekableInputStream = HadoopStreams.wrap(getInputStream());
     while (lengthLeftToRead > 0) {
       if(logger.isTraceEnabled()) {
         logger.trace("PERF: Disk read start. {}, StartOffset: {}, TotalByteSize: {}", this.streamId, this.startOffset, this.totalByteSize);
       }
       Stopwatch timer = Stopwatch.createStarted();
-      int bytesRead = CompatibilityUtil.getBuf(getInputStream(), directBuffer, lengthLeftToRead);
+      int bytesRead = seekableInputStream.read(directBuffer);
+      if (bytesRead < 0) {
+        return bytesRead;
+      }
       lengthLeftToRead -= bytesRead;
       if(logger.isTraceEnabled()) {
         logger.trace(
@@ -113,7 +118,7 @@ public class DirectBufInputStream extends FilterInputStream {
       b.release();
       throw e;
     }
-    if (bytesRead <= -1) {
+    if (bytesRead < 0) {
       b.release();
       return null;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
index 6e9db7e..89731ff 100644
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -46,7 +46,7 @@ import org.apache.parquet.format.Util;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
 
 import io.netty.buffer.ByteBuf;
 
@@ -163,12 +163,10 @@ public class ColumnChunkIncReadStore implements PageReadStore {
               ByteBuf buf = allocator.buffer(pageHeader.compressed_page_size);
               lastPage = buf;
               ByteBuffer buffer = buf.nioBuffer(0, pageHeader.compressed_page_size);
-              int lengthLeftToRead = pageHeader.compressed_page_size;
-              while (lengthLeftToRead > 0) {
-                lengthLeftToRead -= CompatibilityUtil.getBuf(in, buffer, lengthLeftToRead);
-              }
+              HadoopStreams.wrap(in).readFully(buffer);
+              buffer.flip();
               return new DataPageV1(
-                      decompressor.decompress(BytesInput.from(buffer, 0, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
+                      decompressor.decompress(BytesInput.from(buffer), pageHeader.getUncompressed_page_size()),
                       pageHeader.data_page_header.num_values,
                       pageHeader.uncompressed_page_size,
                       fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()),
@@ -182,28 +180,33 @@ public class ColumnChunkIncReadStore implements PageReadStore {
               buf = allocator.buffer(pageHeader.compressed_page_size);
               lastPage = buf;
               buffer = buf.nioBuffer(0, pageHeader.compressed_page_size);
-              lengthLeftToRead = pageHeader.compressed_page_size;
-              while (lengthLeftToRead > 0) {
-                lengthLeftToRead -= CompatibilityUtil.getBuf(in, buffer, lengthLeftToRead);
-              }
+              HadoopStreams.wrap(in).readFully(buffer);
+              buffer.flip();
               DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
               int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
               BytesInput decompressedPageData =
                   decompressor.decompress(
-                      BytesInput.from(buffer, 0, pageHeader.compressed_page_size),
+                      BytesInput.from(buffer),
                       pageHeader.uncompressed_page_size);
+              ByteBuffer byteBuffer = decompressedPageData.toByteBuffer();
+              int limit = byteBuffer.limit();
+              byteBuffer.limit(dataHeaderV2.getRepetition_levels_byte_length());
+              BytesInput repetitionLevels = BytesInput.from(byteBuffer.slice());
+              byteBuffer.position(dataHeaderV2.getRepetition_levels_byte_length());
+              byteBuffer.limit(dataHeaderV2.getRepetition_levels_byte_length() + dataHeaderV2.getDefinition_levels_byte_length());
+              BytesInput definitionLevels = BytesInput.from(byteBuffer.slice());
+              byteBuffer.position(dataHeaderV2.getRepetition_levels_byte_length() + dataHeaderV2.getDefinition_levels_byte_length());
+              byteBuffer.limit(limit);
+              BytesInput data = BytesInput.from(byteBuffer.slice());
+
               return new DataPageV2(
                       dataHeaderV2.getNum_rows(),
                       dataHeaderV2.getNum_nulls(),
                       dataHeaderV2.getNum_values(),
-                      BytesInput.from(decompressedPageData.toByteBuffer(), 0, dataHeaderV2.getRepetition_levels_byte_length()),
-                      BytesInput.from(decompressedPageData.toByteBuffer(),
-                          dataHeaderV2.getRepetition_levels_byte_length(),
-                          dataHeaderV2.getDefinition_levels_byte_length()),
+                      repetitionLevels,
+                      definitionLevels,
                       parquetMetadataConverter.getEncoding(dataHeaderV2.getEncoding()),
-                      BytesInput.from(decompressedPageData.toByteBuffer(),
-                          dataHeaderV2.getRepetition_levels_byte_length() + dataHeaderV2.getDefinition_levels_byte_length(),
-                          dataSize),
+                      data,
                       uncompressedPageSize,
                       fromParquetStatistics(dataHeaderV2.getStatistics(), columnDescriptor.getType()),
                       dataHeaderV2.isIs_compressed()
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
index 93f9920..0ed2245 100644
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
@@ -17,8 +17,6 @@
  */
 package org.apache.parquet.hadoop;
 
-import static org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType;
-
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
@@ -119,7 +117,7 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab
       this.path = path;
       this.compressor = compressor;
       this.buf = new CapacityByteArrayOutputStream(initialSlabSize, maxCapacityHint, allocator);
-      this.totalStatistics = getStatsBasedOnType(this.path.getType());
+      this.totalStatistics = Statistics.createStats(this.path.getPrimitiveType());
     }
 
     @Override
@@ -226,11 +224,7 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab
         writer.writeDictionaryPage(dictionaryPage);
         // tracking the dictionary encoding is handled in writeDictionaryPage
       }
-      List<Encoding> encodings = Lists.newArrayList();
-      encodings.addAll(rlEncodings);
-      encodings.addAll(dlEncodings);
-      encodings.addAll(dataEncodings);
-      writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, encodings);
+      writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, rlEncodings, dlEncodings, dataEncodings);
       writer.endColumn();
       logger.debug(
           String.format(
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
index 50e679a..1da2530 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
@@ -30,6 +30,7 @@ import org.apache.drill.test.rowSet.schema.SchemaBuilder;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -737,6 +738,7 @@ public class TestParquetMetadataCache extends PlanTestBase {
     }
   }
 
+  @Ignore // Statistics for INTERVAL is not available (see PARQUET-1064)
   @Test // DRILL-4139
   public void testIntervalDayPartitionPruning() throws Exception {
     final String intervalDayPartitionTable = "dfs.tmp.`interval_day_partition`";
@@ -762,6 +764,7 @@ public class TestParquetMetadataCache extends PlanTestBase {
     }
   }
 
+  @Ignore // Statistics for INTERVAL is not available (see PARQUET-1064)
   @Test // DRILL-4139
   public void testIntervalYearPartitionPruning() throws Exception {
     final String intervalYearPartitionTable = "dfs.tmp.`interval_yr_partition`";
@@ -812,6 +815,7 @@ public class TestParquetMetadataCache extends PlanTestBase {
     }
   }
 
+  @Ignore // Statistics for DECIMAL is not available (see PARQUET-1322).
   @Test // DRILL-4139
   public void testDecimalPartitionPruning() throws Exception {
     List<String> ctasQueries = Lists.newArrayList();
diff --git a/pom.xml b/pom.xml
index 6078dc7..242b134 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,7 +44,7 @@
     <dep.slf4j.version>1.7.6</dep.slf4j.version>
     <dep.guava.version>18.0</dep.guava.version>
     <forkCount>2</forkCount>
-    <parquet.version>1.8.1-drill-r0</parquet.version>
+    <parquet.version>1.10.0</parquet.version>
     <calcite.version>1.16.0-drill-r3</calcite.version>
     <avatica.version>1.11.0</avatica.version>
     <janino.version>2.7.6</janino.version>
@@ -1522,6 +1522,36 @@
           </exclusion>
         </exclusions>
       </dependency>
+      <dependency>
+        <groupId>org.apache.parquet</groupId>
+        <artifactId>parquet-format</artifactId>
+        <version>2.5.0</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.parquet</groupId>
+        <artifactId>parquet-common</artifactId>
+        <version>${parquet.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
@@ -2040,6 +2070,14 @@
           	<artifactId>parquet-hadoop</artifactId>
           	<version>${parquet.version}</version>
           	<exclusions>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-client</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.apache.hadoop</groupId>
+                <artifactId>hadoop-common</artifactId>
+              </exclusion>
           		<exclusion>
           			<groupId>org.xerial.snappy</groupId>
           			<artifactId>snappy-java</artifactId>

-- 
To stop receiving notification emails like this one, please contact
timothyfarkas@apache.org.

[drill] 02/03: DRILL-6474: Don't use TopN when order by and offset are used without a limit specified.

Posted by ti...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 98dbc3a222990703aebe983883779763e0cdc1e9
Author: Timothy Farkas <ti...@apache.org>
AuthorDate: Wed Jun 6 12:04:39 2018 -0700

    DRILL-6474: Don't use TopN when order by and offset are used without a limit specified.
    
    closes #1313
---
 .../exec/planner/physical/PushLimitToTopN.java     | 21 +++++++++----
 .../physical/impl/limit/TestLimitOperator.java     | 23 ++++++++++++++
 .../physical/impl/limit/TestLimitPlanning.java     | 32 ++++++++++++++++++++
 .../org/apache/drill/test/DrillTestWrapper.java    | 24 ++++++++++++---
 .../java/org/apache/drill/test/TestBuilder.java    | 35 ++++++++++------------
 5 files changed, 105 insertions(+), 30 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PushLimitToTopN.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PushLimitToTopN.java
index 66126ec..1053941 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PushLimitToTopN.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PushLimitToTopN.java
@@ -32,19 +32,28 @@ public class PushLimitToTopN  extends Prule{
 
   @Override
   public boolean matches(RelOptRuleCall call) {
-    return PrelUtil.getPlannerSettings(call.getPlanner()).getOptions()
-      .getOption(PlannerSettings.TOPN.getOptionName()).bool_val;
+    boolean topNEnabled = PrelUtil.getPlannerSettings(call.getPlanner()).getOptions().getOption(PlannerSettings.TOPN.getOptionName()).bool_val;
+
+    if (!topNEnabled) {
+      return false;
+    } else {
+      // If no limit is defined it doesn't make sense to use TopN since it could use unbounded memory in this case.
+      // We should use the sort and limit operator in this case.
+      // This also fixes DRILL-6474
+      final LimitPrel limit = call.rel(0);
+      return limit.getFetch() != null;
+    }
   }
 
   @Override
   public void onMatch(RelOptRuleCall call) {
-    final LimitPrel limit = (LimitPrel) call.rel(0);
-    final SingleMergeExchangePrel smex = (SingleMergeExchangePrel) call.rel(1);
-    final SortPrel sort = (SortPrel) call.rel(2);
+    final LimitPrel limit = call.rel(0);
+    final SingleMergeExchangePrel smex = call.rel(1);
+    final SortPrel sort = call.rel(2);
 
     // First offset to include into results (inclusive). Null implies it is starting from offset 0
     int offset = limit.getOffset() != null ? Math.max(0, RexLiteral.intValue(limit.getOffset())) : 0;
-    int fetch = limit.getFetch() != null?  Math.max(0, RexLiteral.intValue(limit.getFetch())) : 0;
+    int fetch = Math.max(0, RexLiteral.intValue(limit.getFetch()));
 
     final TopNPrel topN = new TopNPrel(limit.getCluster(), sort.getTraitSet(), sort.getInput(), offset + fetch, sort.getCollation());
     final LimitPrel newLimit = new LimitPrel(limit.getCluster(), limit.getTraitSet(),
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java
index 22c0013..7225edc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitOperator.java
@@ -20,12 +20,35 @@ package org.apache.drill.exec.physical.impl.limit;
 import com.google.common.collect.Lists;
 import org.apache.drill.exec.physical.config.Limit;
 import org.apache.drill.exec.physical.unit.PhysicalOpUnitTestBase;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.List;
 
 public class TestLimitOperator extends PhysicalOpUnitTestBase {
 
+  @Rule
+  public BaseDirTestWatcher baseDirTestWatcher = new BaseDirTestWatcher();
+
+  // DRILL-6474
+  @Test
+  public void testLimitIntegrationTest() throws Exception {
+    final ClusterFixtureBuilder builder = new ClusterFixtureBuilder(baseDirTestWatcher);
+
+    try (ClusterFixture clusterFixture = builder.build();
+         ClientFixture clientFixture = clusterFixture.clientFixture()) {
+      clientFixture.testBuilder()
+        .sqlQuery("select name_s10 from `mock`.`employees_100000` order by name_s10 offset 100")
+        .expectsNumRecords(99900)
+        .build()
+        .run();
+    }
+  }
+
   @Test
   public void testLimitMoreRecords() {
     Limit limitConf = new Limit(null, 0, 10);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitPlanning.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitPlanning.java
new file mode 100644
index 0000000..3f5fee2
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitPlanning.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.limit;
+
+import org.apache.drill.PlanTestBase;
+import org.junit.Test;
+
+public class TestLimitPlanning extends PlanTestBase {
+
+  // DRILL-6474
+  @Test
+  public void dontPushdownIntoTopNWhenNoLimit() throws Exception {
+    String query = "select full_name from cp.`employee.json` order by full_name offset 10";
+
+    PlanTestBase.testPlanMatchingPatterns(query, new String[]{".*Sort\\(.*"}, new String[]{".*TopN\\(.*"});
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
index 0dfc1f7..051f4b3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/DrillTestWrapper.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
@@ -85,6 +86,7 @@ public class DrillTestWrapper {
 
   // Unit test doesn't expect any specific batch count
   public static final int EXPECTED_BATCH_COUNT_NOT_SET = -1;
+  public static final int EXPECTED_NUM_RECORDS_NOT_SET = - 1;
 
   // The motivation behind the TestBuilder was to provide a clean API for test writers. The model is mostly designed to
   // prepare all of the components necessary for running the tests, before the TestWrapper is initialized. There is however
@@ -119,11 +121,13 @@ public class DrillTestWrapper {
   private List<Map<String, Object>> baselineRecords;
 
   private int expectedNumBatches;
+  private int expectedNumRecords;
 
   public DrillTestWrapper(TestBuilder testBuilder, TestServices services, Object query, QueryType queryType,
       String baselineOptionSettingQueries, String testOptionSettingQueries,
       QueryType baselineQueryType, boolean ordered, boolean highPerformanceComparison,
-      String[] baselineColumns, List<Map<String, Object>> baselineRecords, int expectedNumBatches) {
+      String[] baselineColumns, List<Map<String, Object>> baselineRecords, int expectedNumBatches,
+      int expectedNumRecords) {
     this.testBuilder = testBuilder;
     this.services = services;
     this.query = query;
@@ -136,6 +140,13 @@ public class DrillTestWrapper {
     this.baselineColumns = baselineColumns;
     this.baselineRecords = baselineRecords;
     this.expectedNumBatches = expectedNumBatches;
+    this.expectedNumRecords = expectedNumRecords;
+
+    Preconditions.checkArgument(!(baselineRecords != null && !ordered && highPerformanceComparison));
+    Preconditions.checkArgument((baselineRecords != null && expectedNumRecords == DrillTestWrapper.EXPECTED_NUM_RECORDS_NOT_SET) || baselineRecords == null,
+      "Cannot define both baselineRecords and the expectedNumRecords.");
+    Preconditions.checkArgument((baselineQueryType != null && expectedNumRecords == DrillTestWrapper.EXPECTED_NUM_RECORDS_NOT_SET) || baselineQueryType == null,
+      "Cannot define both a baselineQueryType and the expectedNumRecords.");
   }
 
   public void run() throws Exception {
@@ -527,9 +538,14 @@ public class DrillTestWrapper {
       // If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes
       // the cases where the baseline is stored in a file.
       if (baselineRecords == null) {
-        test(baselineOptionSettingQueries);
-        expected = testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
-        addToMaterializedResults(expectedRecords, expected, loader);
+        if (expectedNumRecords != DrillTestWrapper.EXPECTED_NUM_RECORDS_NOT_SET) {
+          Assert.assertEquals(expectedNumRecords, actualRecords.size());
+          return;
+        } else {
+          test(baselineOptionSettingQueries);
+          expected = testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
+          addToMaterializedResults(expectedRecords, expected, loader);
+        }
       } else {
         expectedRecords = baselineRecords;
       }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/TestBuilder.java
index e40f86d..98a0a9a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/TestBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/TestBuilder.java
@@ -91,6 +91,7 @@ public class TestBuilder {
   private List<Map<String, Object>> baselineRecords;
 
   private int expectedNumBatches = DrillTestWrapper.EXPECTED_BATCH_COUNT_NOT_SET;
+  private int expectedNumRecords = DrillTestWrapper.EXPECTED_NUM_RECORDS_NOT_SET;
 
   public TestBuilder(TestServices services) {
     this.services = services;
@@ -127,12 +128,9 @@ public class TestBuilder {
     return this;
   }
 
-  public DrillTestWrapper build() throws Exception {
-    if ( ! ordered && highPerformanceComparison ) {
-      throw new Exception("High performance comparison only available for ordered checks, to enforce this restriction, ordered() must be called first.");
-    }
+  public DrillTestWrapper build() {
     return new DrillTestWrapper(this, services, query, queryType, baselineOptionSettingQueries, testOptionSettingQueries,
-        getValidationQueryType(), ordered, highPerformanceComparison, baselineColumns, baselineRecords, expectedNumBatches);
+        getValidationQueryType(), ordered, highPerformanceComparison, baselineColumns, baselineRecords, expectedNumBatches, expectedNumRecords);
   }
 
   public List<Pair<SchemaPath, TypeProtos.MajorType>> getExpectedSchema() {
@@ -248,17 +246,8 @@ public class TestBuilder {
     throw new RuntimeException("Must provide some kind of baseline, either a baseline file or another query");
   }
 
-  protected UserBitShared.QueryType getValidationQueryType() throws Exception {
-    if (singleExplicitBaselineRecord()) {
-      return null;
-    }
-
-    if (ordered) {
-      // If there are no base line records or no baseline query then we will check to make sure that the records are in ascending order
-      return null;
-    }
-
-    throw new RuntimeException("Must provide some kind of baseline, either a baseline file or another query");
+  protected UserBitShared.QueryType getValidationQueryType() {
+    return null;
   }
 
   public JSONTestBuilder jsonBaselineFile(String filePath) {
@@ -329,6 +318,12 @@ public class TestBuilder {
     return this;
   }
 
+  public TestBuilder expectsNumRecords(int expectedNumRecords) {
+    this.expectedNumRecords = expectedNumRecords;
+    this.ordered = false;
+    return this;
+  }
+
   /**
    * This method is used to pass in a simple list of values for a single record verification without
    * the need to create a CSV or JSON file to store the baseline.
@@ -544,7 +539,7 @@ public class TestBuilder {
     }
 
     @Override
-    protected UserBitShared.QueryType getValidationQueryType() throws Exception {
+    protected UserBitShared.QueryType getValidationQueryType() {
       return UserBitShared.QueryType.SQL;
     }
   }
@@ -577,7 +572,7 @@ public class TestBuilder {
     }
 
     @Override
-    protected UserBitShared.QueryType getValidationQueryType() throws Exception {
+    protected UserBitShared.QueryType getValidationQueryType() {
       return null;
     }
 
@@ -608,7 +603,7 @@ public class TestBuilder {
     }
 
     @Override
-    protected UserBitShared.QueryType getValidationQueryType() throws Exception {
+    protected UserBitShared.QueryType getValidationQueryType() {
       return UserBitShared.QueryType.SQL;
     }
 
@@ -639,7 +634,7 @@ public class TestBuilder {
     }
 
     @Override
-    protected UserBitShared.QueryType getValidationQueryType() throws Exception {
+    protected UserBitShared.QueryType getValidationQueryType() {
       return baselineQueryType;
     }
 

-- 
To stop receiving notification emails like this one, please contact
timothyfarkas@apache.org.