You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2022/06/13 15:21:03 UTC
[hive] branch master updated: HIVE-26307: Avoid FS init in FileIO::newInputFile in vectorized Iceberg reads (Peter Vary reviewed by Adam Szita) (#3354)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 76d4abe402a HIVE-26307: Avoid FS init in FileIO::newInputFile in vectorized Iceberg reads (Peter Vary reviewed by Adam Szita) (#3354)
76d4abe402a is described below
commit 76d4abe402abdebb6534e9db3f4209cce8b0d4e6
Author: pvary <pv...@cloudera.com>
AuthorDate: Mon Jun 13 17:20:53 2022 +0200
HIVE-26307: Avoid FS init in FileIO::newInputFile in vectorized Iceberg reads (Peter Vary reviewed by Adam Szita) (#3354)
---
.../mr/hive/vector/HiveVectorizedReader.java | 14 +-
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 148 ++++++++++-----------
.../apache/iceberg/orc/VectorizedReadUtils.java | 14 +-
3 files changed, 77 insertions(+), 99 deletions(-)
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
index 19fa0f06506..00b9b3c73f0 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -51,7 +51,6 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
import org.apache.iceberg.orc.VectorizedReadUtils;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
@@ -72,11 +71,10 @@ public class HiveVectorizedReader {
}
- public static <D> CloseableIterable<D> reader(InputFile inputFile, FileScanTask task, Map<Integer, ?> idToConstant,
+ public static <D> CloseableIterable<D> reader(Path path, FileScanTask task, Map<Integer, ?> idToConstant,
TaskAttemptContext context) {
// Tweaks on jobConf here are relevant for this task only, so we need to copy it first as context's conf is reused..
- JobConf job = new JobConf((JobConf) context.getConfiguration());
- Path path = new Path(inputFile.location());
+ JobConf job = new JobConf(context.getConfiguration());
FileFormat format = task.file().format();
Reporter reporter = ((MapredIcebergInputFormat.CompatibilityTaskAttemptContextImpl) context).getLegacyReporter();
@@ -131,7 +129,7 @@ public class HiveVectorizedReader {
switch (format) {
case ORC:
- recordReader = orcRecordReader(job, reporter, task, inputFile, path, start, length, readColumnIds, fileId);
+ recordReader = orcRecordReader(job, reporter, task, path, start, length, readColumnIds, fileId);
break;
case PARQUET:
@@ -144,12 +142,12 @@ public class HiveVectorizedReader {
return createVectorizedRowBatchIterable(recordReader, job, partitionColIndices, partitionValues);
} catch (IOException ioe) {
- throw new RuntimeException("Error creating vectorized record reader for " + inputFile, ioe);
+ throw new RuntimeException("Error creating vectorized record reader for " + path, ioe);
}
}
private static RecordReader<NullWritable, VectorizedRowBatch> orcRecordReader(JobConf job, Reporter reporter,
- FileScanTask task, InputFile inputFile, Path path, long start, long length, List<Integer> readColumnIds,
+ FileScanTask task, Path path, long start, long length, List<Integer> readColumnIds,
SyntheticFileId fileId) throws IOException {
RecordReader<NullWritable, VectorizedRowBatch> recordReader = null;
@@ -159,7 +157,7 @@ public class HiveVectorizedReader {
// Metadata information has to be passed along in the OrcSplit. Without specifying this, the vectorized
// reader will assume that the ORC file ends at the task's start + length, and might fail reading the tail..
- ByteBuffer serializedOrcTail = VectorizedReadUtils.getSerializedOrcTail(inputFile, fileId, job);
+ ByteBuffer serializedOrcTail = VectorizedReadUtils.getSerializedOrcTail(path, fileId, job);
OrcTail orcTail = VectorizedReadUtils.deserializeToOrcTail(serializedOrcTail);
VectorizedReadUtils.handleIcebergProjection(task, job,
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 7bbd03a09ed..7617c6b17e9 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -31,6 +31,7 @@ import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.llap.LlapHiveUtils;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.mapred.JobConf;
@@ -44,6 +45,7 @@ import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataTableScan;
import org.apache.iceberg.DataTask;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
@@ -80,6 +82,7 @@ import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
import org.apache.iceberg.mr.hive.IcebergAcidUtil;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
@@ -217,7 +220,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
if (MetastoreUtil.hive3PresentOnClasspath()) {
HIVE_VECTORIZED_READER_BUILDER = DynMethods.builder("reader")
.impl(HIVE_VECTORIZED_READER_CLASS,
- InputFile.class,
+ Path.class,
FileScanTask.class,
Map.class,
TaskAttemptContext.class)
@@ -309,17 +312,33 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
currentIterator.close();
}
- private CloseableIterable<T> openTask(FileScanTask currentTask, Schema readSchema) {
- if (currentTask.isDataTask()) {
+ private CloseableIterable<T> openVectorized(FileScanTask task, Schema readSchema) {
+ Preconditions.checkArgument(!task.file().format().equals(FileFormat.AVRO),
+ "Vectorized execution is not yet supported for Iceberg avro tables. " +
+ "Please turn off vectorization and retry the query.");
+ Preconditions.checkArgument(MetastoreUtil.hive3PresentOnClasspath(),
+ "Vectorized read is unsupported for Hive 2 integration.");
+
+ Path path = new Path(task.file().path().toString());
+ Map<Integer, ?> idToConstant = constantsMap(task, IdentityPartitionConverters::convertConstant);
+ Expression residual = HiveIcebergInputFormat.residualForTask(task, context.getConfiguration());
+
+ // TODO: We have to take care of the EncryptionManager when LLAP and vectorization is used
+ CloseableIterable<T> iterator = HIVE_VECTORIZED_READER_BUILDER.invoke(path, task, idToConstant, context);
+
+ return applyResidualFiltering(iterator, residual, readSchema);
+ }
+
+ private CloseableIterable<T> openGeneric(FileScanTask task, Schema readSchema) {
+ if (task.isDataTask()) {
// When querying metadata tables, the currentTask is a DataTask and the data has to
// be fetched from the task instead of reading it from files.
IcebergInternalRecordWrapper wrapper =
new IcebergInternalRecordWrapper(table.schema().asStruct(), readSchema.asStruct());
- return (CloseableIterable) CloseableIterable.transform(((DataTask) currentTask).rows(),
- row -> wrapper.wrap((StructLike) row));
+ return (CloseableIterable) CloseableIterable.transform(((DataTask) task).rows(), row -> wrapper.wrap(row));
}
- DataFile file = currentTask.file();
+ DataFile file = task.file();
InputFile inputFile = table.encryption().decrypt(EncryptedFiles.encryptedInput(
table.io().newInputFile(file.path().toString()),
file.keyMetadata()));
@@ -327,13 +346,13 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
CloseableIterable<T> iterable;
switch (file.format()) {
case AVRO:
- iterable = newAvroIterable(inputFile, currentTask, readSchema);
+ iterable = newAvroIterable(inputFile, task, readSchema);
break;
case ORC:
- iterable = newOrcIterable(inputFile, currentTask, readSchema);
+ iterable = newOrcIterable(inputFile, task, readSchema);
break;
case PARQUET:
- iterable = newParquetIterable(inputFile, currentTask, readSchema);
+ iterable = newParquetIterable(inputFile, task, readSchema);
break;
default:
throw new UnsupportedOperationException(
@@ -350,11 +369,11 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
// TODO: Support Pig and Hive object models for IcebergInputFormat
throw new UnsupportedOperationException("Pig and Hive object models are not supported.");
case HIVE:
- return openTask(currentTask, readSchema);
+ return openVectorized(currentTask, readSchema);
case GENERIC:
DeleteFilter deletes = new GenericDeleteFilter(table.io(), currentTask, table.schema(), readSchema);
Schema requiredSchema = deletes.requiredSchema();
- return deletes.filter(openTask(currentTask, requiredSchema));
+ return deletes.filter(openGeneric(currentTask, requiredSchema));
default:
throw new UnsupportedOperationException("Unsupported memory model");
}
@@ -381,63 +400,45 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile)
.project(readSchema)
.split(task.start(), task.length());
+
if (reuseContainers) {
avroReadBuilder.reuseContainers();
}
+
if (nameMapping != null) {
avroReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
}
- switch (inMemoryDataModel) {
- case PIG:
- case HIVE:
- // TODO implement value readers for Pig and Hive
- throw new UnsupportedOperationException("Vectorized execution is not yet supported for Iceberg avro " +
- "tables. Please turn off vectorization and retry the query.");
- case GENERIC:
- avroReadBuilder.createReaderFunc(
- (expIcebergSchema, expAvroSchema) ->
- DataReader.create(expIcebergSchema, expAvroSchema,
- constantsMap(task, IdentityPartitionConverters::convertConstant)));
- }
+ avroReadBuilder.createReaderFunc(
+ (expIcebergSchema, expAvroSchema) ->
+ DataReader.create(expIcebergSchema, expAvroSchema,
+ constantsMap(task, IdentityPartitionConverters::convertConstant)));
+
return applyResidualFiltering(avroReadBuilder.build(), residual, readSchema);
}
private CloseableIterable<T> newParquetIterable(InputFile inputFile, FileScanTask task, Schema readSchema) {
- Map<Integer, ?> idToConstant = constantsMap(task, IdentityPartitionConverters::convertConstant);
Expression residual = HiveIcebergInputFormat.residualForTask(task, context.getConfiguration());
- CloseableIterable<T> parquetIterator = null;
+ Parquet.ReadBuilder parquetReadBuilder = Parquet.read(inputFile)
+ .project(readSchema)
+ .filter(residual)
+ .caseSensitive(caseSensitive)
+ .split(task.start(), task.length());
- switch (inMemoryDataModel) {
- case PIG:
- throw new UnsupportedOperationException("Parquet support not yet supported for Pig");
- case HIVE:
- if (MetastoreUtil.hive3PresentOnClasspath()) {
- parquetIterator = HIVE_VECTORIZED_READER_BUILDER.invoke(inputFile, task, idToConstant, context);
- } else {
- throw new UnsupportedOperationException("Vectorized read is unsupported for Hive 2 integration.");
- }
- break;
- case GENERIC:
- Parquet.ReadBuilder parquetReadBuilder = Parquet.read(inputFile)
- .project(readSchema)
- .filter(residual)
- .caseSensitive(caseSensitive)
- .split(task.start(), task.length());
- if (reuseContainers) {
- parquetReadBuilder.reuseContainers();
- }
- if (nameMapping != null) {
- parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
- }
- parquetReadBuilder.createReaderFunc(
- fileSchema -> GenericParquetReaders.buildReader(
- readSchema, fileSchema, constantsMap(task, IdentityPartitionConverters::convertConstant)));
-
- parquetIterator = parquetReadBuilder.build();
+ if (reuseContainers) {
+ parquetReadBuilder.reuseContainers();
}
- return applyResidualFiltering(parquetIterator, residual, readSchema);
+
+ if (nameMapping != null) {
+ parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+ }
+
+ parquetReadBuilder.createReaderFunc(
+ fileSchema -> GenericParquetReaders.buildReader(
+ readSchema, fileSchema, constantsMap(task, IdentityPartitionConverters::convertConstant)));
+
+ return applyResidualFiltering(parquetReadBuilder.build(), residual, readSchema);
}
private CloseableIterable<T> newOrcIterable(InputFile inputFile, FileScanTask task, Schema readSchema) {
@@ -445,36 +446,21 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
Schema readSchemaWithoutConstantAndMetadataFields = schemaWithoutConstantsAndMeta(readSchema, idToConstant);
Expression residual = HiveIcebergInputFormat.residualForTask(task, context.getConfiguration());
- CloseableIterable<T> orcIterator = null;
- // ORC does not support reuse containers yet
- switch (inMemoryDataModel) {
- case PIG:
- // TODO: implement value readers for Pig
- throw new UnsupportedOperationException("ORC support not yet supported for Pig");
- case HIVE:
- if (MetastoreUtil.hive3PresentOnClasspath()) {
- orcIterator = HIVE_VECTORIZED_READER_BUILDER.invoke(inputFile, task, idToConstant, context);
- } else {
- throw new UnsupportedOperationException("Vectorized read is unsupported for Hive 2 integration.");
- }
- break;
- case GENERIC:
- ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile)
- .project(readSchemaWithoutConstantAndMetadataFields)
- .filter(residual)
- .caseSensitive(caseSensitive)
- .split(task.start(), task.length());
- orcReadBuilder.createReaderFunc(
- fileSchema -> GenericOrcReader.buildReader(
- readSchema, fileSchema, idToConstant));
-
- if (nameMapping != null) {
- orcReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
- }
- orcIterator = orcReadBuilder.build();
+ ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile)
+ .project(readSchemaWithoutConstantAndMetadataFields)
+ .filter(residual)
+ .caseSensitive(caseSensitive)
+ .split(task.start(), task.length());
+
+ if (nameMapping != null) {
+ orcReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
}
- return applyResidualFiltering(orcIterator, residual, readSchema);
+ orcReadBuilder.createReaderFunc(
+ fileSchema -> GenericOrcReader.buildReader(
+ readSchema, fileSchema, idToConstant));
+
+ return applyResidualFiltering(orcReadBuilder.build(), residual, readSchema);
}
private Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type, Object, Object> converter) {
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
index 6547615d46e..b9d3a92f89b 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
@@ -43,8 +43,6 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.hadoop.HadoopInputFile;
-import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mr.hive.HiveIcebergInputFormat;
import org.slf4j.Logger;
@@ -64,12 +62,12 @@ public class VectorizedReadUtils {
/**
* Opens the ORC inputFile and reads the metadata information to construct a byte buffer with OrcTail content.
* Note that org.apache.orc (aka Hive bundled) ORC is used, as it is the older version compared to Iceberg's ORC.
- * @param inputFile - the original ORC file - this needs to be accessed to retrieve the original schema for mapping
+ * @param path - the original ORC file - this needs to be accessed to retrieve the original schema for mapping
* @param job - JobConf instance to adjust
* @param fileId - FileID for the input file, serves as cache key in an LLAP setup
* @throws IOException - errors relating to accessing the ORC file
*/
- public static ByteBuffer getSerializedOrcTail(InputFile inputFile, SyntheticFileId fileId, JobConf job)
+ public static ByteBuffer getSerializedOrcTail(Path path, SyntheticFileId fileId, JobConf job)
throws IOException {
ByteBuffer result = null;
@@ -77,7 +75,6 @@ public class VectorizedReadUtils {
if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon()) &&
LlapProxy.getIo() != null) {
MapWork mapWork = LlapHiveUtils.findMapWork(job);
- Path path = new Path(inputFile.location());
PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(path, mapWork.getPathToPartitionInfo());
// Note: Since Hive doesn't know about partition information of Iceberg tables, partitionDesc is only used to
@@ -100,13 +97,10 @@ public class VectorizedReadUtils {
// Fallback to simple ORC reader file opening method in lack of or failure of LLAP.
if (result == null) {
org.apache.orc.OrcFile.ReaderOptions readerOptions =
- org.apache.orc.OrcFile.readerOptions(job).useUTCTimestamp(true);
- if (inputFile instanceof HadoopInputFile) {
- readerOptions.filesystem(((HadoopInputFile) inputFile).getFileSystem());
- }
+ org.apache.orc.OrcFile.readerOptions(job).useUTCTimestamp(true).filesystem(path.getFileSystem(job));
try (org.apache.orc.impl.ReaderImpl orcFileReader =
- (org.apache.orc.impl.ReaderImpl) OrcFile.createReader(new Path(inputFile.location()), readerOptions)) {
+ (org.apache.orc.impl.ReaderImpl) OrcFile.createReader(path, readerOptions)) {
return orcFileReader.getSerializedFileFooter();
}
}