You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ay...@apache.org on 2023/01/11 10:05:04 UTC
[hive] branch master updated: HIVE-26913: Iceberg: HiveVectorizedReader::parquetRecordReader should reuse footer information. (#3933). (Ayush Saxena, reviewed by Rajesh Balamohan)
This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new afa696fde08 HIVE-26913: Iceberg: HiveVectorizedReader::parquetRecordReader should reuse footer information. (#3933). (Ayush Saxena, reviewed by Rajesh Balamohan)
afa696fde08 is described below
commit afa696fde08d98e6d8bcb7d07a423a0e9c8a8fc9
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Wed Jan 11 15:34:50 2023 +0530
HIVE-26913: Iceberg: HiveVectorizedReader::parquetRecordReader should reuse footer information. (#3933). (Ayush Saxena, reviewed by Rajesh Balamohan)
---
.../apache/iceberg/mr/hive/vector/HiveVectorizedReader.java | 8 ++++----
.../mr/hive/vector/ParquetSchemaFieldNameVisitor.java | 8 ++++----
.../apache/iceberg/parquet/ParquetFooterInputFromCache.java | 8 ++++----
iceberg/iceberg-shading/pom.xml | 4 ----
.../hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java | 4 ++--
.../hive/ql/io/parquet/VectorizedParquetInputFormat.java | 8 +++++++-
.../hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java | 5 +----
.../ql/io/parquet/vector/VectorizedParquetRecordReader.java | 13 ++++++++-----
8 files changed, 30 insertions(+), 28 deletions(-)
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
index b1a3a197709..e3d206189af 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -43,10 +43,6 @@ import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hive.iceberg.org.apache.orc.OrcConf;
-import org.apache.hive.iceberg.org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.hive.iceberg.org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.hive.iceberg.org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.hive.iceberg.org.apache.parquet.schema.MessageType;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
@@ -67,6 +63,10 @@ import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.orc.impl.OrcTail;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
/**
* Utility class to create vectorized readers for Hive.
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/ParquetSchemaFieldNameVisitor.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/ParquetSchemaFieldNameVisitor.java
index ff455d8418a..37b089e7b5e 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/ParquetSchemaFieldNameVisitor.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/ParquetSchemaFieldNameVisitor.java
@@ -20,16 +20,16 @@ package org.apache.iceberg.mr.hive.vector;
import java.util.List;
import java.util.Map;
-import org.apache.hive.iceberg.org.apache.parquet.schema.GroupType;
-import org.apache.hive.iceberg.org.apache.parquet.schema.MessageType;
-import org.apache.hive.iceberg.org.apache.parquet.schema.PrimitiveType;
-import org.apache.hive.iceberg.org.apache.parquet.schema.Type;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
/**
* Collects the top level field names from Parquet schema. During schema visit it translates the expected schema's
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/ParquetFooterInputFromCache.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/ParquetFooterInputFromCache.java
index bd4bd74106f..64eae4ccfaf 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/ParquetFooterInputFromCache.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/ParquetFooterInputFromCache.java
@@ -25,13 +25,13 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
-import org.apache.hive.iceberg.org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.hive.iceberg.org.apache.parquet.io.InputFile;
-import org.apache.hive.iceberg.org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
/**
* Copy of ParquetFooterInputFromCache from hive-exec module to switch dependent Parquet packages
- * to the shaded version (org.apache.hive.iceberg.org.apache.parquet.io...)
+ * to the shaded version (org.apache.parquet.io...)
*
* The Parquet InputFile implementation that allows the reader to
* read the footer from cache without being aware of the latter.
diff --git a/iceberg/iceberg-shading/pom.xml b/iceberg/iceberg-shading/pom.xml
index 2cfec8e5f07..950383ba604 100644
--- a/iceberg/iceberg-shading/pom.xml
+++ b/iceberg/iceberg-shading/pom.xml
@@ -86,10 +86,6 @@
<pattern>org.apache.orc</pattern>
<shadedPattern>${shade.relocation.package.prefix}.org.apache.orc</shadedPattern>
</relocation>
- <relocation>
- <pattern>org.apache.parquet</pattern>
- <shadedPattern>${shade.relocation.package.prefix}.org.apache.parquet</shadedPattern>
- </relocation>
<relocation>
<pattern>shaded.parquet</pattern>
<shadedPattern>${shade.relocation.package.prefix}.shaded.parquet</shadedPattern>
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
index 529c13871e3..9b817eb3642 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
@@ -81,10 +81,10 @@ public abstract class ParquetRecordReaderBase {
this.filePath = fileSplit.getPath();
}
- protected void setupMetadataAndParquetSplit(JobConf conf) throws IOException {
+ protected void setupMetadataAndParquetSplit(JobConf conf, ParquetMetadata metadata) throws IOException {
// In the case of stat tasks a dummy split is created with -1 length but real path...
if (fileSplit.getLength() != 0) {
- parquetMetadata = getParquetMetadata(filePath, conf);
+ this.parquetMetadata = metadata != null ? metadata : getParquetMetadata(filePath, conf);
parquetInputSplit = getSplit(conf);
}
// having null as parquetInputSplit seems to be a valid case based on this file's history
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
index 6c46928eecc..4dd0deee1d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
/**
* Vectorized input format for Parquet files
@@ -38,6 +39,7 @@ public class VectorizedParquetInputFormat
private FileMetadataCache metadataCache = null;
private DataCache dataCache = null;
private Configuration cacheConf = null;
+ private ParquetMetadata metadata;
public VectorizedParquetInputFormat() {
}
@@ -48,7 +50,11 @@ public class VectorizedParquetInputFormat
JobConf jobConf,
Reporter reporter) throws IOException {
return new VectorizedParquetRecordReader(
- inputSplit, jobConf, metadataCache, dataCache, cacheConf);
+ inputSplit, jobConf, metadataCache, dataCache, cacheConf, metadata);
+ }
+
+ public void setMetadata(ParquetMetadata metadata) throws IOException {
+ this.metadata = metadata;
}
@Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
index aebcd247354..2d19f5d4c0b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
@@ -22,8 +22,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
-import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
-import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
@@ -35,7 +33,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.parquet.hadoop.ParquetInputFormat;
-import org.apache.parquet.hadoop.ParquetInputSplit;
import org.apache.parquet.hadoop.util.ContextUtil;
public class ParquetRecordReaderWrapper extends ParquetRecordReaderBase
@@ -59,7 +56,7 @@ public class ParquetRecordReaderWrapper extends ParquetRecordReaderBase
throws IOException, InterruptedException {
super(oldJobConf, oldSplit);
- setupMetadataAndParquetSplit(oldJobConf);
+ setupMetadataAndParquetSplit(oldJobConf, null);
this.splitLen = oldSplit.getLength();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 33e828ff18b..b56990d81fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -151,10 +151,8 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
this(oldInputSplit, conf, null, null, null);
}
- public VectorizedParquetRecordReader(
- InputSplit oldInputSplit, JobConf conf,
- FileMetadataCache metadataCache, DataCache dataCache, Configuration cacheConf)
- throws IOException {
+ public VectorizedParquetRecordReader(InputSplit oldInputSplit, JobConf conf, FileMetadataCache metadataCache,
+ DataCache dataCache, Configuration cacheConf, ParquetMetadata parquetMetadata) throws IOException {
super(conf, oldInputSplit);
try {
this.metadataCache = metadataCache;
@@ -177,7 +175,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
}
}
- setupMetadataAndParquetSplit(conf);
+ setupMetadataAndParquetSplit(conf, parquetMetadata);
colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf);
//initialize the rowbatchContext
@@ -194,6 +192,11 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
}
}
+ public VectorizedParquetRecordReader(InputSplit oldInputSplit, JobConf conf, FileMetadataCache metadataCache,
+ DataCache dataCache, Configuration cacheConf) throws IOException {
+ this(oldInputSplit, conf, metadataCache, dataCache, cacheConf, null);
+ }
+
private void initPartitionValues(FileSplit fileSplit, JobConf conf) throws IOException {
int partitionColumnCount = rbCtx.getPartitionColumnCount();
if (partitionColumnCount > 0) {