You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by rb...@apache.org on 2022/09/11 00:24:43 UTC

[hive] branch master updated: HIVE-26490: Iceberg: Residual expression is constructed for the task from multiple places causing CPU burn (#3574)

This is an automated email from the ASF dual-hosted git repository.

rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new f80e547e9c4 HIVE-26490: Iceberg: Residual expression is constructed for the task from multiple places causing CPU burn (#3574)
f80e547e9c4 is described below

commit f80e547e9c4754a1ef032f91c7b9d3a6170230c6
Author: rbalamohan <rb...@apache.org>
AuthorDate: Sun Sep 11 05:54:30 2022 +0530

    HIVE-26490: Iceberg: Residual expression is constructed for the task from multiple places causing CPU burn (#3574)
---
 .../apache/iceberg/mr/hive/vector/HiveVectorizedReader.java    | 10 ++++++----
 .../org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java    |  7 +++++--
 .../main/java/org/apache/iceberg/orc/VectorizedReadUtils.java  |  5 ++---
 3 files changed, 13 insertions(+), 9 deletions(-)

diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
index 6e23dff92c6..372a43376e2 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -52,6 +52,7 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
@@ -76,7 +77,7 @@ public class HiveVectorizedReader {
   }
 
   public static <D> CloseableIterable<D> reader(Path path, FileScanTask task, Map<Integer, ?> idToConstant,
-      TaskAttemptContext context) {
+      TaskAttemptContext context, Expression residual) {
     // Tweaks on jobConf here are relevant for this task only, so we need to copy it first as context's conf is reused..
     JobConf job = new JobConf(context.getConfiguration());
     FileFormat format = task.file().format();
@@ -134,7 +135,8 @@ public class HiveVectorizedReader {
 
       switch (format) {
         case ORC:
-          recordReader = orcRecordReader(job, reporter, task, path, start, length, readColumnIds, fileId);
+          recordReader = orcRecordReader(job, reporter, task, path, start, length, readColumnIds,
+              fileId, residual);
           break;
 
         case PARQUET:
@@ -153,7 +155,7 @@ public class HiveVectorizedReader {
 
   private static RecordReader<NullWritable, VectorizedRowBatch> orcRecordReader(JobConf job, Reporter reporter,
       FileScanTask task, Path path, long start, long length, List<Integer> readColumnIds,
-      SyntheticFileId fileId) throws IOException {
+      SyntheticFileId fileId, Expression residual) throws IOException {
     RecordReader<NullWritable, VectorizedRowBatch> recordReader = null;
 
     // Need to turn positional schema evolution off since we use column name based schema evolution for projection
@@ -166,7 +168,7 @@ public class HiveVectorizedReader {
     OrcTail orcTail = VectorizedReadUtils.deserializeToOrcTail(serializedOrcTail);
 
     VectorizedReadUtils.handleIcebergProjection(task, job,
-        VectorizedReadUtils.deserializeToShadedOrcTail(serializedOrcTail).getSchema());
+        VectorizedReadUtils.deserializeToShadedOrcTail(serializedOrcTail).getSchema(), residual);
 
     // If LLAP enabled, try to retrieve an LLAP record reader - this might yield to null in some special cases
     if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon()) &&
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 7617c6b17e9..c60782c6fb7 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -96,6 +96,7 @@ import org.apache.iceberg.util.SerializationUtil;
  * @param <T> T is the in memory data model which can either be Pig tuples, Hive rows. Default is Iceberg records
  */
 public class IcebergInputFormat<T> extends InputFormat<Void, T> {
+
   /**
    * Configures the {@code Job} to use the {@code IcebergInputFormat} and
    * returns a helper to add further configuration.
@@ -223,7 +224,8 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
                 Path.class,
                 FileScanTask.class,
                 Map.class,
-                TaskAttemptContext.class)
+                TaskAttemptContext.class,
+                Expression.class)
             .buildStatic();
       } else {
         HIVE_VECTORIZED_READER_BUILDER = null;
@@ -324,7 +326,8 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       Expression residual = HiveIcebergInputFormat.residualForTask(task, context.getConfiguration());
 
       // TODO: We have to take care of the EncryptionManager when LLAP and vectorization is used
-      CloseableIterable<T> iterator = HIVE_VECTORIZED_READER_BUILDER.invoke(path, task, idToConstant, context);
+      CloseableIterable<T> iterator = HIVE_VECTORIZED_READER_BUILDER.invoke(path, task,
+          idToConstant, context, residual);
 
       return applyResidualFiltering(iterator, residual, readSchema);
     }
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
index b9d3a92f89b..c05f8bc62ab 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
@@ -44,7 +44,6 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.expressions.Binder;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.mapping.MappingUtil;
-import org.apache.iceberg.mr.hive.HiveIcebergInputFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -134,7 +133,8 @@ public class VectorizedReadUtils {
    * @param fileSchema - ORC file schema of the input file
    * @throws IOException - errors relating to accessing the ORC file
    */
-  public static void handleIcebergProjection(FileScanTask task, JobConf job, TypeDescription fileSchema)
+  public static void handleIcebergProjection(FileScanTask task, JobConf job,
+      TypeDescription fileSchema, Expression residual)
       throws IOException {
 
     // We need to map with the current (i.e. current Hive table columns) full schema (without projections),
@@ -163,7 +163,6 @@ public class VectorizedReadUtils {
     job.set(ColumnProjectionUtils.ORC_SCHEMA_STRING, readOrcSchema.toString());
 
     // Predicate pushdowns needs to be adjusted too in case of column renames, we let Iceberg generate this into job
-    Expression residual = HiveIcebergInputFormat.residualForTask(task, job);
     Expression boundFilter = Binder.bind(currentSchema.asStruct(), residual, false);
 
     // Note the use of the unshaded version of this class here (required for SARG deseralization later)