You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ay...@apache.org on 2023/01/11 10:05:04 UTC

[hive] branch master updated: HIVE-26913: Iceberg: HiveVectorizedReader::parquetRecordReader should reuse footer information. (#3933). (Ayush Saxena, reviewed by Rajesh Balamohan)

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new afa696fde08 HIVE-26913: Iceberg: HiveVectorizedReader::parquetRecordReader should reuse footer information. (#3933). (Ayush Saxena, reviewed by  Rajesh Balamohan)
afa696fde08 is described below

commit afa696fde08d98e6d8bcb7d07a423a0e9c8a8fc9
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Wed Jan 11 15:34:50 2023 +0530

    HIVE-26913: Iceberg: HiveVectorizedReader::parquetRecordReader should reuse footer information. (#3933). (Ayush Saxena, reviewed by  Rajesh Balamohan)
---
 .../apache/iceberg/mr/hive/vector/HiveVectorizedReader.java |  8 ++++----
 .../mr/hive/vector/ParquetSchemaFieldNameVisitor.java       |  8 ++++----
 .../apache/iceberg/parquet/ParquetFooterInputFromCache.java |  8 ++++----
 iceberg/iceberg-shading/pom.xml                             |  4 ----
 .../hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java  |  4 ++--
 .../hive/ql/io/parquet/VectorizedParquetInputFormat.java    |  8 +++++++-
 .../hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java |  5 +----
 .../ql/io/parquet/vector/VectorizedParquetRecordReader.java | 13 ++++++++-----
 8 files changed, 30 insertions(+), 28 deletions(-)

diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
index b1a3a197709..e3d206189af 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -43,10 +43,6 @@ import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hive.iceberg.org.apache.orc.OrcConf;
-import org.apache.hive.iceberg.org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.hive.iceberg.org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.hive.iceberg.org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.hive.iceberg.org.apache.parquet.schema.MessageType;
 import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
@@ -67,6 +63,10 @@ import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.apache.orc.impl.OrcTail;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
 
 /**
  * Utility class to create vectorized readers for Hive.
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/ParquetSchemaFieldNameVisitor.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/ParquetSchemaFieldNameVisitor.java
index ff455d8418a..37b089e7b5e 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/ParquetSchemaFieldNameVisitor.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/ParquetSchemaFieldNameVisitor.java
@@ -20,16 +20,16 @@ package org.apache.iceberg.mr.hive.vector;
 
 import java.util.List;
 import java.util.Map;
-import org.apache.hive.iceberg.org.apache.parquet.schema.GroupType;
-import org.apache.hive.iceberg.org.apache.parquet.schema.MessageType;
-import org.apache.hive.iceberg.org.apache.parquet.schema.PrimitiveType;
-import org.apache.hive.iceberg.org.apache.parquet.schema.Type;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
 
 /**
  * Collects the top level field names from Parquet schema. During schema visit it translates the expected schema's
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/ParquetFooterInputFromCache.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/ParquetFooterInputFromCache.java
index bd4bd74106f..64eae4ccfaf 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/ParquetFooterInputFromCache.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/ParquetFooterInputFromCache.java
@@ -25,13 +25,13 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
 import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
-import org.apache.hive.iceberg.org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.hive.iceberg.org.apache.parquet.io.InputFile;
-import org.apache.hive.iceberg.org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
 
 /**
  * Copy of ParquetFooterInputFromCache from hive-exec module to switch dependent Parquet packages
- * to the shaded version (org.apache.hive.iceberg.org.apache.parquet.io...)
+ * to the shaded version (org.apache.parquet.io...)
  *
  * The Parquet InputFile implementation that allows the reader to
  * read the footer from cache without being aware of the latter.
diff --git a/iceberg/iceberg-shading/pom.xml b/iceberg/iceberg-shading/pom.xml
index 2cfec8e5f07..950383ba604 100644
--- a/iceberg/iceberg-shading/pom.xml
+++ b/iceberg/iceberg-shading/pom.xml
@@ -86,10 +86,6 @@
                       <pattern>org.apache.orc</pattern>
                       <shadedPattern>${shade.relocation.package.prefix}.org.apache.orc</shadedPattern>
                     </relocation>
-                    <relocation>
-                      <pattern>org.apache.parquet</pattern>
-                      <shadedPattern>${shade.relocation.package.prefix}.org.apache.parquet</shadedPattern>
-                    </relocation>
                     <relocation>
                       <pattern>shaded.parquet</pattern>
                       <shadedPattern>${shade.relocation.package.prefix}.shaded.parquet</shadedPattern>
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
index 529c13871e3..9b817eb3642 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
@@ -81,10 +81,10 @@ public abstract class ParquetRecordReaderBase {
     this.filePath = fileSplit.getPath();
   }
 
-  protected void setupMetadataAndParquetSplit(JobConf conf) throws IOException {
+  protected void setupMetadataAndParquetSplit(JobConf conf, ParquetMetadata metadata) throws IOException {
     // In the case of stat tasks a dummy split is created with -1 length but real path...
     if (fileSplit.getLength() != 0) {
-      parquetMetadata = getParquetMetadata(filePath, conf);
+      this.parquetMetadata = metadata != null ? metadata : getParquetMetadata(filePath, conf);
       parquetInputSplit = getSplit(conf);
     }
     // having null as parquetInputSplit seems to be a valid case based on this file's history
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
index 6c46928eecc..4dd0deee1d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 
 /**
  * Vectorized input format for Parquet files
@@ -38,6 +39,7 @@ public class VectorizedParquetInputFormat
   private FileMetadataCache metadataCache = null;
   private DataCache dataCache = null;
   private Configuration cacheConf = null;
+  private ParquetMetadata metadata;
 
   public VectorizedParquetInputFormat() {
   }
@@ -48,7 +50,11 @@ public class VectorizedParquetInputFormat
     JobConf jobConf,
     Reporter reporter) throws IOException {
     return new VectorizedParquetRecordReader(
-        inputSplit, jobConf, metadataCache, dataCache, cacheConf);
+        inputSplit, jobConf, metadataCache, dataCache, cacheConf, metadata);
+  }
+
+  public void setMetadata(ParquetMetadata metadata) throws IOException {
+    this.metadata = metadata;
   }
 
   @Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
index aebcd247354..2d19f5d4c0b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
@@ -22,8 +22,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
-import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
-import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
@@ -35,7 +33,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
 import org.apache.parquet.hadoop.ParquetInputFormat;
-import org.apache.parquet.hadoop.ParquetInputSplit;
 import org.apache.parquet.hadoop.util.ContextUtil;
 
 public class ParquetRecordReaderWrapper extends ParquetRecordReaderBase
@@ -59,7 +56,7 @@ public class ParquetRecordReaderWrapper extends ParquetRecordReaderBase
           throws IOException, InterruptedException {
     super(oldJobConf, oldSplit);
 
-    setupMetadataAndParquetSplit(oldJobConf);
+    setupMetadataAndParquetSplit(oldJobConf, null);
 
     this.splitLen = oldSplit.getLength();
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 33e828ff18b..b56990d81fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -151,10 +151,8 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     this(oldInputSplit, conf, null, null, null);
   }
 
-  public VectorizedParquetRecordReader(
-      InputSplit oldInputSplit, JobConf conf,
-      FileMetadataCache metadataCache, DataCache dataCache, Configuration cacheConf)
-      throws IOException {
+  public VectorizedParquetRecordReader(InputSplit oldInputSplit, JobConf conf, FileMetadataCache metadataCache,
+      DataCache dataCache, Configuration cacheConf, ParquetMetadata parquetMetadata) throws IOException {
     super(conf, oldInputSplit);
     try {
       this.metadataCache = metadataCache;
@@ -177,7 +175,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
         }
       }
 
-      setupMetadataAndParquetSplit(conf);
+      setupMetadataAndParquetSplit(conf, parquetMetadata);
 
       colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf);
       //initialize the rowbatchContext
@@ -194,6 +192,11 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     }
   }
 
+  public VectorizedParquetRecordReader(InputSplit oldInputSplit, JobConf conf, FileMetadataCache metadataCache,
+      DataCache dataCache, Configuration cacheConf) throws IOException {
+    this(oldInputSplit, conf, metadataCache, dataCache, cacheConf, null);
+  }
+
   private void initPartitionValues(FileSplit fileSplit, JobConf conf) throws IOException {
      int partitionColumnCount = rbCtx.getPartitionColumnCount();
      if (partitionColumnCount > 0) {