You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2017/11/01 01:31:39 UTC

hive git commit: HIVE-17696: Vectorized reader does not seem to be pushing down projection columns in certain code paths (Ferdinand Xu, via Vihang Karajgaonkar)

Repository: hive
Updated Branches:
  refs/heads/branch-2 64e91e706 -> 98079c063


HIVE-17696: Vectorized reader does not seem to be pushing down projection columns in certain code paths (Ferdinand Xu, via Vihang Karajgaonkar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/98079c06
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/98079c06
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/98079c06

Branch: refs/heads/branch-2
Commit: 98079c063a2bfbb9fe167794f22cb362a50909e6
Parents: 64e91e7
Author: Ferdinand Xu <ch...@intel.com>
Authored: Thu Oct 26 15:06:38 2017 +0800
Committer: Ferdinand Xu <ch...@intel.com>
Committed: Wed Nov 1 09:16:21 2017 +0800

----------------------------------------------------------------------
 .../parquet/read/DataWritableReadSupport.java   | 89 ++++++++++++++------
 .../vector/VectorizedParquetRecordReader.java   | 24 +-----
 2 files changed, 67 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/98079c06/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
index 604cbbc..8645d51 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
@@ -16,7 +16,6 @@ package org.apache.hadoop.hive.ql.io.parquet.read;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
@@ -354,34 +353,15 @@ public class DataWritableReadSupport extends ReadSupport<ArrayWritable> {
       String columnTypes = configuration.get(IOConstants.COLUMNS_TYPES);
       List<TypeInfo> columnTypesList = getColumnTypes(columnTypes);
 
-      MessageType tableSchema;
-      if (indexAccess) {
-        List<Integer> indexSequence = new ArrayList<Integer>();
-
-        // Generates a sequence list of indexes
-        for(int i = 0; i < columnNamesList.size(); i++) {
-          indexSequence.add(i);
-        }
-
-        tableSchema = getSchemaByIndex(fileSchema, columnNamesList, indexSequence);
-      } else {
-
-        tableSchema = getSchemaByName(fileSchema, columnNamesList, columnTypesList);
-      }
+      MessageType tableSchema =
+        getRequestedSchemaForIndexAccess(indexAccess, columnNamesList, columnTypesList, fileSchema);
 
       contextMetadata.put(HIVE_TABLE_AS_PARQUET_SCHEMA, tableSchema.toString());
       contextMetadata.put(PARQUET_COLUMN_INDEX_ACCESS, String.valueOf(indexAccess));
       this.hiveTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList);
 
-      Set<String> groupPaths = ColumnProjectionUtils.getNestedColumnPaths(configuration);
-      List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
-      if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) {
-        MessageType requestedSchemaByUser = getProjectedSchema(tableSchema, columnNamesList,
-          indexColumnsWanted, groupPaths);
-        return new ReadContext(requestedSchemaByUser, contextMetadata);
-      } else {
-        return new ReadContext(tableSchema, contextMetadata);
-      }
+      return new ReadContext(getRequestedPrunedSchema(columnNamesList, tableSchema, configuration),
+        contextMetadata);
     } else {
       contextMetadata.put(HIVE_TABLE_AS_PARQUET_SCHEMA, fileSchema.toString());
       return new ReadContext(fileSchema, contextMetadata);
@@ -389,6 +369,67 @@ public class DataWritableReadSupport extends ReadSupport<ArrayWritable> {
   }
 
   /**
+   * It's used for vectorized code path.
+   * @param indexAccess
+   * @param columnNamesList
+   * @param columnTypesList
+   * @param fileSchema
+   * @param configuration
+   * @return
+   */
+  public static MessageType getRequestedSchema(
+    boolean indexAccess,
+    List<String> columnNamesList,
+    List<TypeInfo> columnTypesList,
+    MessageType fileSchema,
+    Configuration configuration) {
+    MessageType tableSchema =
+      getRequestedSchemaForIndexAccess(indexAccess, columnNamesList, columnTypesList, fileSchema);
+
+    List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
+    //TODO Duplicated code for init method since vectorization reader path doesn't support Nested
+    // column pruning so far. See HIVE-15156
+    if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) {
+      return DataWritableReadSupport
+        .getSchemaByIndex(tableSchema, columnNamesList, indexColumnsWanted);
+    } else {
+      return fileSchema;
+    }
+  }
+
+  private static MessageType getRequestedSchemaForIndexAccess(
+    boolean indexAccess,
+    List<String> columnNamesList,
+    List<TypeInfo> columnTypesList,
+    MessageType fileSchema) {
+    if (indexAccess) {
+      List<Integer> indexSequence = new ArrayList<Integer>();
+
+      // Generates a sequence list of indexes
+      for (int i = 0; i < columnNamesList.size(); i++) {
+        indexSequence.add(i);
+      }
+
+      return getSchemaByIndex(fileSchema, columnNamesList, indexSequence);
+    } else {
+      return getSchemaByName(fileSchema, columnNamesList, columnTypesList);
+    }
+  }
+
+  private static MessageType getRequestedPrunedSchema(
+    List<String> columnNamesList,
+    MessageType fileSchema,
+    Configuration configuration) {
+    Set<String> groupPaths = ColumnProjectionUtils.getNestedColumnPaths(configuration);
+    List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
+    if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) {
+      return getProjectedSchema(fileSchema, columnNamesList, indexColumnsWanted, groupPaths);
+    } else {
+      return fileSchema;
+    }
+  }
+
+  /**
    *
    * It creates the hive read support to interpret data from parquet to hive
    *

http://git-wip-us.apache.org/repos/asf/hive/blob/98079c06/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 846f7c5..9359098 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -206,29 +206,9 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     }
     this.fileSchema = footer.getFileMetaData().getSchema();
 
-    MessageType tableSchema;
-    if (indexAccess) {
-      List<Integer> indexSequence = new ArrayList<>();
-
-      // Generates a sequence list of indexes
-      for(int i = 0; i < columnNamesList.size(); i++) {
-        indexSequence.add(i);
-      }
-
-      tableSchema = DataWritableReadSupport.getSchemaByIndex(fileSchema, columnNamesList,
-        indexSequence);
-    } else {
-      tableSchema = DataWritableReadSupport.getSchemaByName(fileSchema, columnNamesList,
-        columnTypesList);
-    }
-
     indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
-    if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) {
-      requestedSchema =
-        DataWritableReadSupport.getSchemaByIndex(tableSchema, columnNamesList, indexColumnsWanted);
-    } else {
-      requestedSchema = fileSchema;
-    }
+    requestedSchema = DataWritableReadSupport
+      .getRequestedSchema(indexAccess, columnNamesList, columnTypesList, fileSchema, configuration);
 
     this.reader = new ParquetFileReader(
       configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());