You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/24 23:51:57 UTC

[23/54] [abbrv] hive git commit: HIVE-16672: Parquet vectorization doesn't work for tables with partition info (Colin Ma, reviewed by Ferdinand Xu)

HIVE-16672: Parquet vectorization doesn't work for tables with partition info (Colin Ma, reviewed by Ferdinand Xu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3be1eedb
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3be1eedb
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3be1eedb

Branch: refs/heads/hive-14535
Commit: 3be1eedb1860173cd2f90631fb2a29edefa4e89e
Parents: e1e48bc
Author: Ferdinand Xu <ch...@intel.com>
Authored: Fri May 19 14:01:04 2017 +0800
Committer: Ferdinand Xu <ch...@intel.com>
Committed: Fri May 19 14:01:04 2017 +0800

----------------------------------------------------------------------
 .../vector/VectorizedParquetRecordReader.java   |   31 +-
 .../vector_partitioned_date_time.q              |  113 +
 .../llap/vector_partitioned_date_time.q.out     | 2428 ++++++++++++++++++
 3 files changed, 2566 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3be1eedb/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 312cdac..96d3847 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -73,6 +73,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
   private List<TypeInfo> columnTypesList;
   private VectorizedRowBatchCtx rbCtx;
   private List<Integer> indexColumnsWanted;
+  private Object[] partitionValues;
 
   /**
    * For each request column, the reader to read this column. This is NULL if this column
@@ -128,12 +129,23 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
       }
       colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf);
       rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
+      initPartitionValues((FileSplit) oldInputSplit, conf);
     } catch (Throwable e) {
       LOG.error("Failed to create the vectorized reader due to exception " + e);
       throw new RuntimeException(e);
     }
   }
 
+   private void initPartitionValues(FileSplit fileSplit, JobConf conf) throws IOException {
+      int partitionColumnCount = rbCtx.getPartitionColumnCount();
+      if (partitionColumnCount > 0) {
+        partitionValues = new Object[partitionColumnCount];
+        rbCtx.getPartitionValues(rbCtx, conf, fileSplit, partitionValues);
+      } else {
+        partitionValues = null;
+      }
+   }
+
   public void initialize(
       ParquetInputSplit split,
       JobConf configuration) throws IOException, InterruptedException {
@@ -263,16 +275,23 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     if (rowsReturned >= totalRowCount) {
       return false;
     }
+
+    // Add partition cols if necessary (see VectorizedOrcInputFormat for details).
+    if (partitionValues != null) {
+      rbCtx.addPartitionColsToBatch(columnarBatch, partitionValues);
+    }
     checkEndOfRowGroup();
 
     int num = (int) Math.min(VectorizedRowBatch.DEFAULT_SIZE, totalCountLoadedSoFar - rowsReturned);
-    for (int i = 0; i < columnReaders.length; ++i) {
-      if (columnReaders[i] == null) {
-        continue;
+    if (colsToInclude.size() > 0) {
+      for (int i = 0; i < columnReaders.length; ++i) {
+        if (columnReaders[i] == null) {
+          continue;
+        }
+        columnarBatch.cols[colsToInclude.get(i)].isRepeating = true;
+        columnReaders[i].readBatch(num, columnarBatch.cols[colsToInclude.get(i)],
+            columnTypesList.get(colsToInclude.get(i)));
       }
-      columnarBatch.cols[colsToInclude.get(i)].isRepeating = true;
-      columnReaders[i].readBatch(num, columnarBatch.cols[colsToInclude.get(i)],
-        columnTypesList.get(colsToInclude.get(i)));
     }
     rowsReturned += num;
     columnarBatch.size = num;

http://git-wip-us.apache.org/repos/asf/hive/blob/3be1eedb/ql/src/test/queries/clientpositive/vector_partitioned_date_time.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/vector_partitioned_date_time.q b/ql/src/test/queries/clientpositive/vector_partitioned_date_time.q
index bf4c461..107fe7c 100644
--- a/ql/src/test/queries/clientpositive/vector_partitioned_date_time.q
+++ b/ql/src/test/queries/clientpositive/vector_partitioned_date_time.q
@@ -126,3 +126,116 @@ explain vectorization expression
 select fl_time, count(*) from flights_tiny_orc_partitioned_timestamp group by fl_time;
 
 select fl_time, count(*) from flights_tiny_orc_partitioned_timestamp group by fl_time;
+
+-- test for Parquet file format
+CREATE TABLE flights_tiny_parquet STORED AS PARQUET AS
+SELECT origin_city_name, dest_city_name, fl_date, to_utc_timestamp(fl_date, 'America/Los_Angeles') as fl_time, arr_delay, fl_num
+FROM flights_tiny;
+
+SELECT * FROM flights_tiny_parquet;
+
+SET hive.vectorized.execution.enabled=false;
+
+select * from flights_tiny_parquet sort by fl_num, fl_date limit 25;
+
+select fl_date, count(*) from flights_tiny_parquet group by fl_date;
+
+SET hive.vectorized.execution.enabled=true;
+
+explain vectorization expression
+select * from flights_tiny_parquet sort by fl_num, fl_date limit 25;
+
+select * from flights_tiny_parquet sort by fl_num, fl_date limit 25;
+
+explain vectorization expression
+select fl_date, count(*) from flights_tiny_parquet group by fl_date;
+
+select fl_date, count(*) from flights_tiny_parquet group by fl_date;
+
+
+SET hive.vectorized.execution.enabled=false;
+
+CREATE TABLE flights_tiny_parquet_partitioned_date (
+  origin_city_name STRING,
+  dest_city_name STRING,
+  fl_time TIMESTAMP,
+  arr_delay FLOAT,
+  fl_num INT
+)
+PARTITIONED BY (fl_date DATE)
+STORED AS PARQUET;
+
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+INSERT INTO TABLE flights_tiny_parquet_partitioned_date
+PARTITION (fl_date)
+SELECT  origin_city_name, dest_city_name, fl_time, arr_delay, fl_num, fl_date
+FROM flights_tiny_parquet;
+
+
+select * from flights_tiny_parquet_partitioned_date;
+
+select * from flights_tiny_parquet_partitioned_date sort by fl_num, fl_date limit 25;
+
+select fl_date, count(*) from flights_tiny_parquet_partitioned_date group by fl_date;
+
+SET hive.vectorized.execution.enabled=true;
+
+explain vectorization expression
+select * from flights_tiny_parquet_partitioned_date;
+
+select * from flights_tiny_parquet_partitioned_date;
+
+explain vectorization expression
+select * from flights_tiny_parquet_partitioned_date sort by fl_num, fl_date limit 25;
+
+select * from flights_tiny_parquet_partitioned_date sort by fl_num, fl_date limit 25;
+
+explain vectorization expression
+select fl_date, count(*) from flights_tiny_parquet_partitioned_date group by fl_date;
+
+select fl_date, count(*) from flights_tiny_parquet_partitioned_date group by fl_date;
+
+
+SET hive.vectorized.execution.enabled=false;
+
+CREATE TABLE flights_tiny_parquet_partitioned_timestamp (
+  origin_city_name STRING,
+  dest_city_name STRING,
+  fl_date DATE,
+  arr_delay FLOAT,
+  fl_num INT
+)
+PARTITIONED BY (fl_time TIMESTAMP)
+STORED AS PARQUET;
+
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+INSERT INTO TABLE flights_tiny_parquet_partitioned_timestamp
+PARTITION (fl_time)
+SELECT  origin_city_name, dest_city_name, fl_date, arr_delay, fl_num, fl_time
+FROM flights_tiny_parquet;
+
+
+select * from flights_tiny_parquet_partitioned_timestamp;
+
+select * from flights_tiny_parquet_partitioned_timestamp sort by fl_num, fl_time limit 25;
+
+select fl_time, count(*) from flights_tiny_parquet_partitioned_timestamp group by fl_time;
+
+SET hive.vectorized.execution.enabled=true;
+
+explain vectorization expression
+select * from flights_tiny_parquet_partitioned_timestamp;
+
+select * from flights_tiny_parquet_partitioned_timestamp;
+
+explain vectorization expression
+select * from flights_tiny_parquet_partitioned_timestamp sort by fl_num, fl_time limit 25;
+
+select * from flights_tiny_parquet_partitioned_timestamp sort by fl_num, fl_time limit 25;
+
+explain vectorization expression
+select fl_time, count(*) from flights_tiny_parquet_partitioned_timestamp group by fl_time;
+
+select fl_time, count(*) from flights_tiny_parquet_partitioned_timestamp group by fl_time;
\ No newline at end of file