You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2021/10/26 13:04:40 UTC

[GitHub] [hive] marton-bod commented on a change in pull request #2748: HIVE-25628

marton-bod commented on a change in pull request #2748:
URL: https://github.com/apache/hive/pull/2748#discussion_r736498819



##########
File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
##########
@@ -114,19 +115,24 @@ private HiveVectorizedReader() {
           // Need to turn positional schema evolution off since we use column name based schema evolution for projection
           // and Iceberg will make a mapping between the file schema and the current reading schema.
           job.setBoolean(OrcConf.FORCE_POSITIONAL_EVOLUTION.getHiveConfName(), false);
-          VectorizedReadUtils.handleIcebergProjection(inputFile, task, job);
+
+          // Iceberg currently does not track the last modification time of a file. Until that's added, we need to set
+          // Long.MIN_VALUE as last modification time in the fileId triplet.

Review comment:
       nit: the comment is good, but maybe add a TODO as well so we can grep for these tech debts later

##########
File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
##########
@@ -20,28 +20,81 @@
 package org.apache.iceberg.orc;
 
 import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapHiveUtils;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.ql.io.SyntheticFileId;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hive.iceberg.org.apache.orc.Reader;
 import org.apache.hive.iceberg.org.apache.orc.TypeDescription;
+import org.apache.hive.iceberg.org.apache.orc.impl.ReaderImpl;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.expressions.Binder;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.orc.impl.BufferChunk;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Utilities that rely on Iceberg code from org.apache.iceberg.orc package.
  */
 public class VectorizedReadUtils {
 
+  private static final Logger LOG = LoggerFactory.getLogger(VectorizedReadUtils.class);
+
   private VectorizedReadUtils() {
 
   }
 
+  private static TypeDescription getSchemaForFile(InputFile inputFile, SyntheticFileId fileId, JobConf job)
+      throws IOException {
+    TypeDescription schema = null;
+
+    if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon()) &&
+        LlapProxy.getIo() != null) {
+      MapWork mapWork = LlapHiveUtils.findMapWork(job);
+      Path path = new Path(inputFile.location());
+      PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(path, mapWork.getPathToPartitionInfo());
+
+      // Note: Since Hive doesn't know about partition information of Iceberg tables, partitionDesc is only used to
+      // deduct the table (and DB) name here.
+      CacheTag cacheTag = HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE) ?
+          LlapHiveUtils.getDbAndTableNameForMetrics(path, true, partitionDesc) : null;
+
+      try {
+        // Schema has to be serialized and deserialized as it is passed between different packages of TypeDescription..
+        BufferChunk tailBuffer = LlapProxy.getIo().getOrcTailFromCache(path, job, cacheTag, fileId).getTailBuffer();
+        schema = ReaderImpl.extractFileTail(tailBuffer.getData()).getSchema();
+      } catch (IOException ioe) {
+        LOG.warn("LLAP is turned on but was unable to get file metadata information through its cache.", ioe);

Review comment:
       Do we want to log the inputFile.location() here for debugging?

##########
File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
##########
@@ -50,43 +103,38 @@ private VectorizedReadUtils() {
    * @param job - JobConf instance to adjust
    * @throws IOException - errors relating to accessing the ORC file
    */
-  public static void handleIcebergProjection(InputFile inputFile, FileScanTask task, JobConf job)
-      throws IOException {
-    Reader orcFileReader = ORC.newFileReader(inputFile, job);
-
-    try {
-      // We need to map with the current (i.e. current Hive table columns) full schema (without projections),
-      // as OrcInputFormat will take care of the projections by the use of an include boolean array
-      Schema currentSchema = task.spec().schema();
-      TypeDescription fileSchema = orcFileReader.getSchema();
-
-      TypeDescription readOrcSchema;
-      if (ORCSchemaUtil.hasIds(fileSchema)) {
-        readOrcSchema = ORCSchemaUtil.buildOrcProjection(currentSchema, fileSchema);
-      } else {
-        TypeDescription typeWithIds =
-            ORCSchemaUtil.applyNameMapping(fileSchema, MappingUtil.create(currentSchema));
-        readOrcSchema = ORCSchemaUtil.buildOrcProjection(currentSchema, typeWithIds);
-      }
+  public static void handleIcebergProjection(InputFile inputFile, FileScanTask task, JobConf job,
+      SyntheticFileId fileId) throws IOException {

Review comment:
       please add the new param to the javadoc

##########
File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
##########
@@ -20,28 +20,81 @@
 package org.apache.iceberg.orc;
 
 import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapHiveUtils;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.ql.io.SyntheticFileId;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hive.iceberg.org.apache.orc.Reader;
 import org.apache.hive.iceberg.org.apache.orc.TypeDescription;
+import org.apache.hive.iceberg.org.apache.orc.impl.ReaderImpl;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.expressions.Binder;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.orc.impl.BufferChunk;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Utilities that rely on Iceberg code from org.apache.iceberg.orc package.
  */
 public class VectorizedReadUtils {
 
+  private static final Logger LOG = LoggerFactory.getLogger(VectorizedReadUtils.class);
+
   private VectorizedReadUtils() {
 
   }
 
+  private static TypeDescription getSchemaForFile(InputFile inputFile, SyntheticFileId fileId, JobConf job)
+      throws IOException {
+    TypeDescription schema = null;
+
+    if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon()) &&
+        LlapProxy.getIo() != null) {
+      MapWork mapWork = LlapHiveUtils.findMapWork(job);
+      Path path = new Path(inputFile.location());
+      PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(path, mapWork.getPathToPartitionInfo());
+
+      // Note: Since Hive doesn't know about partition information of Iceberg tables, partitionDesc is only used to
+      // deduct the table (and DB) name here.
+      CacheTag cacheTag = HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE) ?
+          LlapHiveUtils.getDbAndTableNameForMetrics(path, true, partitionDesc) : null;
+
+      try {
+        // Schema has to be serialized and deserialized as it is passed between different packages of TypeDescription..

Review comment:
       Do you mean 'different packages' as in we need to transform an hive-module `TypeDescription` into an orc-module type description? There is no foreseeable workaround for this right (and therefore no point in adding a TODO here?)
   In any case, I would just elaborate the comment a little bit so that future maintainers won't scratch their heads too much on seeing this :)

##########
File path: iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
##########
@@ -20,28 +20,81 @@
 package org.apache.iceberg.orc;
 
 import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapHiveUtils;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.ql.io.SyntheticFileId;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hive.iceberg.org.apache.orc.Reader;
 import org.apache.hive.iceberg.org.apache.orc.TypeDescription;
+import org.apache.hive.iceberg.org.apache.orc.impl.ReaderImpl;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.expressions.Binder;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.orc.impl.BufferChunk;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Utilities that rely on Iceberg code from org.apache.iceberg.orc package.
  */
 public class VectorizedReadUtils {
 
+  private static final Logger LOG = LoggerFactory.getLogger(VectorizedReadUtils.class);
+
   private VectorizedReadUtils() {
 
   }
 
+  private static TypeDescription getSchemaForFile(InputFile inputFile, SyntheticFileId fileId, JobConf job)
+      throws IOException {
+    TypeDescription schema = null;
+
+    if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon()) &&
+        LlapProxy.getIo() != null) {
+      MapWork mapWork = LlapHiveUtils.findMapWork(job);
+      Path path = new Path(inputFile.location());
+      PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(path, mapWork.getPathToPartitionInfo());
+
+      // Note: Since Hive doesn't know about partition information of Iceberg tables, partitionDesc is only used to
+      // deduct the table (and DB) name here.
+      CacheTag cacheTag = HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE) ?
+          LlapHiveUtils.getDbAndTableNameForMetrics(path, true, partitionDesc) : null;
+
+      try {
+        // Schema has to be serialized and deserialized as it is passed between different packages of TypeDescription..
+        BufferChunk tailBuffer = LlapProxy.getIo().getOrcTailFromCache(path, job, cacheTag, fileId).getTailBuffer();
+        schema = ReaderImpl.extractFileTail(tailBuffer.getData()).getSchema();
+      } catch (IOException ioe) {
+        LOG.warn("LLAP is turned on but was unable to get file metadata information through its cache.", ioe);
+      }
+
+    }
+
+    // Fallback to simple ORC reader file opening method in lack of or failure of LLAP.
+    if (schema == null) {
+      Reader orcFileReader = ORC.newFileReader(inputFile, job);

Review comment:
       maybe open this in a try-with-resources? or at least move it inside the try block?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org