You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2022/06/13 15:21:03 UTC

[hive] branch master updated: HIVE-26307: Avoid FS init in FileIO::newInputFile in vectorized Iceberg reads (Peter Vary reviewed by Adam Szita) (#3354)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 76d4abe402a HIVE-26307: Avoid FS init in FileIO::newInputFile in vectorized Iceberg reads (Peter Vary reviewed by Adam Szita) (#3354)
76d4abe402a is described below

commit 76d4abe402abdebb6534e9db3f4209cce8b0d4e6
Author: pvary <pv...@cloudera.com>
AuthorDate: Mon Jun 13 17:20:53 2022 +0200

    HIVE-26307: Avoid FS init in FileIO::newInputFile in vectorized Iceberg reads (Peter Vary reviewed by Adam Szita) (#3354)
---
 .../mr/hive/vector/HiveVectorizedReader.java       |  14 +-
 .../iceberg/mr/mapreduce/IcebergInputFormat.java   | 148 ++++++++++-----------
 .../apache/iceberg/orc/VectorizedReadUtils.java    |  14 +-
 3 files changed, 77 insertions(+), 99 deletions(-)

diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
index 19fa0f06506..00b9b3c73f0 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -51,7 +51,6 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
 import org.apache.iceberg.orc.VectorizedReadUtils;
 import org.apache.iceberg.parquet.ParquetSchemaUtil;
@@ -72,11 +71,10 @@ public class HiveVectorizedReader {
 
   }
 
-  public static <D> CloseableIterable<D> reader(InputFile inputFile, FileScanTask task, Map<Integer, ?> idToConstant,
+  public static <D> CloseableIterable<D> reader(Path path, FileScanTask task, Map<Integer, ?> idToConstant,
       TaskAttemptContext context) {
     // Tweaks on jobConf here are relevant for this task only, so we need to copy it first as context's conf is reused..
-    JobConf job = new JobConf((JobConf) context.getConfiguration());
-    Path path = new Path(inputFile.location());
+    JobConf job = new JobConf(context.getConfiguration());
     FileFormat format = task.file().format();
     Reporter reporter = ((MapredIcebergInputFormat.CompatibilityTaskAttemptContextImpl) context).getLegacyReporter();
 
@@ -131,7 +129,7 @@ public class HiveVectorizedReader {
 
       switch (format) {
         case ORC:
-          recordReader = orcRecordReader(job, reporter, task, inputFile, path, start, length, readColumnIds, fileId);
+          recordReader = orcRecordReader(job, reporter, task, path, start, length, readColumnIds, fileId);
           break;
 
         case PARQUET:
@@ -144,12 +142,12 @@ public class HiveVectorizedReader {
       return createVectorizedRowBatchIterable(recordReader, job, partitionColIndices, partitionValues);
 
     } catch (IOException ioe) {
-      throw new RuntimeException("Error creating vectorized record reader for " + inputFile, ioe);
+      throw new RuntimeException("Error creating vectorized record reader for " + path, ioe);
     }
   }
 
   private static RecordReader<NullWritable, VectorizedRowBatch> orcRecordReader(JobConf job, Reporter reporter,
-      FileScanTask task, InputFile inputFile, Path path, long start, long length, List<Integer> readColumnIds,
+      FileScanTask task, Path path, long start, long length, List<Integer> readColumnIds,
       SyntheticFileId fileId) throws IOException {
     RecordReader<NullWritable, VectorizedRowBatch> recordReader = null;
 
@@ -159,7 +157,7 @@ public class HiveVectorizedReader {
 
     // Metadata information has to be passed along in the OrcSplit. Without specifying this, the vectorized
     // reader will assume that the ORC file ends at the task's start + length, and might fail reading the tail..
-    ByteBuffer serializedOrcTail = VectorizedReadUtils.getSerializedOrcTail(inputFile, fileId, job);
+    ByteBuffer serializedOrcTail = VectorizedReadUtils.getSerializedOrcTail(path, fileId, job);
     OrcTail orcTail = VectorizedReadUtils.deserializeToOrcTail(serializedOrcTail);
 
     VectorizedReadUtils.handleIcebergProjection(task, job,
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 7bbd03a09ed..7617c6b17e9 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -31,6 +31,7 @@ import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.llap.LlapHiveUtils;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.mapred.JobConf;
@@ -44,6 +45,7 @@ import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataTableScan;
 import org.apache.iceberg.DataTask;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.PartitionSpec;
@@ -80,6 +82,7 @@ import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
 import org.apache.iceberg.mr.hive.IcebergAcidUtil;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.TypeUtil;
@@ -217,7 +220,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       if (MetastoreUtil.hive3PresentOnClasspath()) {
         HIVE_VECTORIZED_READER_BUILDER = DynMethods.builder("reader")
             .impl(HIVE_VECTORIZED_READER_CLASS,
-                InputFile.class,
+                Path.class,
                 FileScanTask.class,
                 Map.class,
                 TaskAttemptContext.class)
@@ -309,17 +312,33 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       currentIterator.close();
     }
 
-    private CloseableIterable<T> openTask(FileScanTask currentTask, Schema readSchema) {
-      if (currentTask.isDataTask()) {
+    private CloseableIterable<T> openVectorized(FileScanTask task, Schema readSchema) {
+      Preconditions.checkArgument(!task.file().format().equals(FileFormat.AVRO),
+          "Vectorized execution is not yet supported for Iceberg avro tables. " +
+              "Please turn off vectorization and retry the query.");
+      Preconditions.checkArgument(MetastoreUtil.hive3PresentOnClasspath(),
+          "Vectorized read is unsupported for Hive 2 integration.");
+
+      Path path = new Path(task.file().path().toString());
+      Map<Integer, ?> idToConstant = constantsMap(task, IdentityPartitionConverters::convertConstant);
+      Expression residual = HiveIcebergInputFormat.residualForTask(task, context.getConfiguration());
+
+      // TODO: We have to take care of the EncryptionManager when LLAP and vectorization is used
+      CloseableIterable<T> iterator = HIVE_VECTORIZED_READER_BUILDER.invoke(path, task, idToConstant, context);
+
+      return applyResidualFiltering(iterator, residual, readSchema);
+    }
+
+    private CloseableIterable<T> openGeneric(FileScanTask task, Schema readSchema) {
+      if (task.isDataTask()) {
         // When querying metadata tables, the currentTask is a DataTask and the data has to
         // be fetched from the task instead of reading it from files.
         IcebergInternalRecordWrapper wrapper =
             new IcebergInternalRecordWrapper(table.schema().asStruct(), readSchema.asStruct());
-        return (CloseableIterable) CloseableIterable.transform(((DataTask) currentTask).rows(),
-            row -> wrapper.wrap((StructLike) row));
+        return (CloseableIterable) CloseableIterable.transform(((DataTask) task).rows(), row -> wrapper.wrap(row));
       }
 
-      DataFile file = currentTask.file();
+      DataFile file = task.file();
       InputFile inputFile = table.encryption().decrypt(EncryptedFiles.encryptedInput(
           table.io().newInputFile(file.path().toString()),
           file.keyMetadata()));
@@ -327,13 +346,13 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       CloseableIterable<T> iterable;
       switch (file.format()) {
         case AVRO:
-          iterable = newAvroIterable(inputFile, currentTask, readSchema);
+          iterable = newAvroIterable(inputFile, task, readSchema);
           break;
         case ORC:
-          iterable = newOrcIterable(inputFile, currentTask, readSchema);
+          iterable = newOrcIterable(inputFile, task, readSchema);
           break;
         case PARQUET:
-          iterable = newParquetIterable(inputFile, currentTask, readSchema);
+          iterable = newParquetIterable(inputFile, task, readSchema);
           break;
         default:
           throw new UnsupportedOperationException(
@@ -350,11 +369,11 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
           // TODO: Support Pig and Hive object models for IcebergInputFormat
           throw new UnsupportedOperationException("Pig and Hive object models are not supported.");
         case HIVE:
-          return openTask(currentTask, readSchema);
+          return openVectorized(currentTask, readSchema);
         case GENERIC:
           DeleteFilter deletes = new GenericDeleteFilter(table.io(), currentTask, table.schema(), readSchema);
           Schema requiredSchema = deletes.requiredSchema();
-          return deletes.filter(openTask(currentTask, requiredSchema));
+          return deletes.filter(openGeneric(currentTask, requiredSchema));
         default:
           throw new UnsupportedOperationException("Unsupported memory model");
       }
@@ -381,63 +400,45 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile)
           .project(readSchema)
           .split(task.start(), task.length());
+
       if (reuseContainers) {
         avroReadBuilder.reuseContainers();
       }
+
       if (nameMapping != null) {
         avroReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
       }
 
-      switch (inMemoryDataModel) {
-        case PIG:
-        case HIVE:
-          // TODO implement value readers for Pig and Hive
-          throw new UnsupportedOperationException("Vectorized execution is not yet supported for Iceberg avro " +
-              "tables. Please turn off vectorization and retry the query.");
-        case GENERIC:
-          avroReadBuilder.createReaderFunc(
-              (expIcebergSchema, expAvroSchema) ->
-                  DataReader.create(expIcebergSchema, expAvroSchema,
-                      constantsMap(task, IdentityPartitionConverters::convertConstant)));
-      }
+      avroReadBuilder.createReaderFunc(
+          (expIcebergSchema, expAvroSchema) ->
+              DataReader.create(expIcebergSchema, expAvroSchema,
+                  constantsMap(task, IdentityPartitionConverters::convertConstant)));
+
       return applyResidualFiltering(avroReadBuilder.build(), residual, readSchema);
     }
 
     private CloseableIterable<T> newParquetIterable(InputFile inputFile, FileScanTask task, Schema readSchema) {
-      Map<Integer, ?> idToConstant = constantsMap(task, IdentityPartitionConverters::convertConstant);
       Expression residual = HiveIcebergInputFormat.residualForTask(task, context.getConfiguration());
 
-      CloseableIterable<T> parquetIterator = null;
+      Parquet.ReadBuilder parquetReadBuilder = Parquet.read(inputFile)
+          .project(readSchema)
+          .filter(residual)
+          .caseSensitive(caseSensitive)
+          .split(task.start(), task.length());
 
-      switch (inMemoryDataModel) {
-        case PIG:
-          throw new UnsupportedOperationException("Parquet support not yet supported for Pig");
-        case HIVE:
-          if (MetastoreUtil.hive3PresentOnClasspath()) {
-            parquetIterator = HIVE_VECTORIZED_READER_BUILDER.invoke(inputFile, task, idToConstant, context);
-          } else {
-            throw new UnsupportedOperationException("Vectorized read is unsupported for Hive 2 integration.");
-          }
-          break;
-        case GENERIC:
-          Parquet.ReadBuilder parquetReadBuilder = Parquet.read(inputFile)
-              .project(readSchema)
-              .filter(residual)
-              .caseSensitive(caseSensitive)
-              .split(task.start(), task.length());
-          if (reuseContainers) {
-            parquetReadBuilder.reuseContainers();
-          }
-          if (nameMapping != null) {
-            parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
-          }
-          parquetReadBuilder.createReaderFunc(
-              fileSchema -> GenericParquetReaders.buildReader(
-                  readSchema, fileSchema, constantsMap(task, IdentityPartitionConverters::convertConstant)));
-
-          parquetIterator = parquetReadBuilder.build();
+      if (reuseContainers) {
+        parquetReadBuilder.reuseContainers();
       }
-      return applyResidualFiltering(parquetIterator, residual, readSchema);
+
+      if (nameMapping != null) {
+        parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
+      }
+
+      parquetReadBuilder.createReaderFunc(
+          fileSchema -> GenericParquetReaders.buildReader(
+              readSchema, fileSchema, constantsMap(task, IdentityPartitionConverters::convertConstant)));
+
+      return applyResidualFiltering(parquetReadBuilder.build(), residual, readSchema);
     }
 
     private CloseableIterable<T> newOrcIterable(InputFile inputFile, FileScanTask task, Schema readSchema) {
@@ -445,36 +446,21 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       Schema readSchemaWithoutConstantAndMetadataFields = schemaWithoutConstantsAndMeta(readSchema, idToConstant);
       Expression residual = HiveIcebergInputFormat.residualForTask(task, context.getConfiguration());
 
-      CloseableIterable<T> orcIterator = null;
-      // ORC does not support reuse containers yet
-      switch (inMemoryDataModel) {
-        case PIG:
-          // TODO: implement value readers for Pig
-          throw new UnsupportedOperationException("ORC support not yet supported for Pig");
-        case HIVE:
-          if (MetastoreUtil.hive3PresentOnClasspath()) {
-            orcIterator = HIVE_VECTORIZED_READER_BUILDER.invoke(inputFile, task, idToConstant, context);
-          } else {
-            throw new UnsupportedOperationException("Vectorized read is unsupported for Hive 2 integration.");
-          }
-          break;
-        case GENERIC:
-          ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile)
-              .project(readSchemaWithoutConstantAndMetadataFields)
-              .filter(residual)
-              .caseSensitive(caseSensitive)
-              .split(task.start(), task.length());
-          orcReadBuilder.createReaderFunc(
-              fileSchema -> GenericOrcReader.buildReader(
-                  readSchema, fileSchema, idToConstant));
-
-          if (nameMapping != null) {
-            orcReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
-          }
-          orcIterator = orcReadBuilder.build();
+      ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile)
+          .project(readSchemaWithoutConstantAndMetadataFields)
+          .filter(residual)
+          .caseSensitive(caseSensitive)
+          .split(task.start(), task.length());
+
+      if (nameMapping != null) {
+        orcReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
       }
 
-      return applyResidualFiltering(orcIterator, residual, readSchema);
+      orcReadBuilder.createReaderFunc(
+          fileSchema -> GenericOrcReader.buildReader(
+              readSchema, fileSchema, idToConstant));
+
+      return applyResidualFiltering(orcReadBuilder.build(), residual, readSchema);
     }
 
     private Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type, Object, Object> converter) {
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
index 6547615d46e..b9d3a92f89b 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
@@ -43,8 +43,6 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.expressions.Binder;
 import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.hadoop.HadoopInputFile;
-import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.mapping.MappingUtil;
 import org.apache.iceberg.mr.hive.HiveIcebergInputFormat;
 import org.slf4j.Logger;
@@ -64,12 +62,12 @@ public class VectorizedReadUtils {
   /**
    * Opens the ORC inputFile and reads the metadata information to construct a byte buffer with OrcTail content.
    * Note that org.apache.orc (aka Hive bundled) ORC is used, as it is the older version compared to Iceberg's ORC.
-   * @param inputFile - the original ORC file - this needs to be accessed to retrieve the original schema for mapping
+   * @param path - the original ORC file - this needs to be accessed to retrieve the original schema for mapping
    * @param job - JobConf instance to adjust
    * @param fileId - FileID for the input file, serves as cache key in an LLAP setup
    * @throws IOException - errors relating to accessing the ORC file
    */
-  public static ByteBuffer getSerializedOrcTail(InputFile inputFile, SyntheticFileId fileId, JobConf job)
+  public static ByteBuffer getSerializedOrcTail(Path path, SyntheticFileId fileId, JobConf job)
       throws IOException {
 
     ByteBuffer result = null;
@@ -77,7 +75,6 @@ public class VectorizedReadUtils {
     if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon()) &&
         LlapProxy.getIo() != null) {
       MapWork mapWork = LlapHiveUtils.findMapWork(job);
-      Path path = new Path(inputFile.location());
       PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(path, mapWork.getPathToPartitionInfo());
 
       // Note: Since Hive doesn't know about partition information of Iceberg tables, partitionDesc is only used to
@@ -100,13 +97,10 @@ public class VectorizedReadUtils {
     // Fallback to simple ORC reader file opening method in lack of or failure of LLAP.
     if (result == null) {
       org.apache.orc.OrcFile.ReaderOptions readerOptions =
-          org.apache.orc.OrcFile.readerOptions(job).useUTCTimestamp(true);
-      if (inputFile instanceof HadoopInputFile) {
-        readerOptions.filesystem(((HadoopInputFile) inputFile).getFileSystem());
-      }
+          org.apache.orc.OrcFile.readerOptions(job).useUTCTimestamp(true).filesystem(path.getFileSystem(job));
 
       try (org.apache.orc.impl.ReaderImpl orcFileReader =
-               (org.apache.orc.impl.ReaderImpl) OrcFile.createReader(new Path(inputFile.location()), readerOptions)) {
+               (org.apache.orc.impl.ReaderImpl) OrcFile.createReader(path, readerOptions)) {
         return orcFileReader.getSerializedFileFooter();
       }
     }