You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by rb...@apache.org on 2022/09/11 00:24:43 UTC
[hive] branch master updated: HIVE-26490: Iceberg: Residual expression is constructed for the task from multiple places causing CPU burn (#3574)
This is an automated email from the ASF dual-hosted git repository.
rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new f80e547e9c4 HIVE-26490: Iceberg: Residual expression is constructed for the task from multiple places causing CPU burn (#3574)
f80e547e9c4 is described below
commit f80e547e9c4754a1ef032f91c7b9d3a6170230c6
Author: rbalamohan <rb...@apache.org>
AuthorDate: Sun Sep 11 05:54:30 2022 +0530
HIVE-26490: Iceberg: Residual expression is constructed for the task from multiple places causing CPU burn (#3574)
---
.../apache/iceberg/mr/hive/vector/HiveVectorizedReader.java | 10 ++++++----
.../org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java | 7 +++++--
.../main/java/org/apache/iceberg/orc/VectorizedReadUtils.java | 5 ++---
3 files changed, 13 insertions(+), 9 deletions(-)
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
index 6e23dff92c6..372a43376e2 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -52,6 +52,7 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
@@ -76,7 +77,7 @@ public class HiveVectorizedReader {
}
public static <D> CloseableIterable<D> reader(Path path, FileScanTask task, Map<Integer, ?> idToConstant,
- TaskAttemptContext context) {
+ TaskAttemptContext context, Expression residual) {
// Tweaks on jobConf here are relevant for this task only, so we need to copy it first as context's conf is reused..
JobConf job = new JobConf(context.getConfiguration());
FileFormat format = task.file().format();
@@ -134,7 +135,8 @@ public class HiveVectorizedReader {
switch (format) {
case ORC:
- recordReader = orcRecordReader(job, reporter, task, path, start, length, readColumnIds, fileId);
+ recordReader = orcRecordReader(job, reporter, task, path, start, length, readColumnIds,
+ fileId, residual);
break;
case PARQUET:
@@ -153,7 +155,7 @@ public class HiveVectorizedReader {
private static RecordReader<NullWritable, VectorizedRowBatch> orcRecordReader(JobConf job, Reporter reporter,
FileScanTask task, Path path, long start, long length, List<Integer> readColumnIds,
- SyntheticFileId fileId) throws IOException {
+ SyntheticFileId fileId, Expression residual) throws IOException {
RecordReader<NullWritable, VectorizedRowBatch> recordReader = null;
// Need to turn positional schema evolution off since we use column name based schema evolution for projection
@@ -166,7 +168,7 @@ public class HiveVectorizedReader {
OrcTail orcTail = VectorizedReadUtils.deserializeToOrcTail(serializedOrcTail);
VectorizedReadUtils.handleIcebergProjection(task, job,
- VectorizedReadUtils.deserializeToShadedOrcTail(serializedOrcTail).getSchema());
+ VectorizedReadUtils.deserializeToShadedOrcTail(serializedOrcTail).getSchema(), residual);
// If LLAP enabled, try to retrieve an LLAP record reader - this might yield to null in some special cases
if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon()) &&
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 7617c6b17e9..c60782c6fb7 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -96,6 +96,7 @@ import org.apache.iceberg.util.SerializationUtil;
* @param <T> T is the in memory data model which can either be Pig tuples, Hive rows. Default is Iceberg records
*/
public class IcebergInputFormat<T> extends InputFormat<Void, T> {
+
/**
* Configures the {@code Job} to use the {@code IcebergInputFormat} and
* returns a helper to add further configuration.
@@ -223,7 +224,8 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
Path.class,
FileScanTask.class,
Map.class,
- TaskAttemptContext.class)
+ TaskAttemptContext.class,
+ Expression.class)
.buildStatic();
} else {
HIVE_VECTORIZED_READER_BUILDER = null;
@@ -324,7 +326,8 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
Expression residual = HiveIcebergInputFormat.residualForTask(task, context.getConfiguration());
// TODO: We have to take care of the EncryptionManager when LLAP and vectorization is used
- CloseableIterable<T> iterator = HIVE_VECTORIZED_READER_BUILDER.invoke(path, task, idToConstant, context);
+ CloseableIterable<T> iterator = HIVE_VECTORIZED_READER_BUILDER.invoke(path, task,
+ idToConstant, context, residual);
return applyResidualFiltering(iterator, residual, readSchema);
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
index b9d3a92f89b..c05f8bc62ab 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
@@ -44,7 +44,6 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.mapping.MappingUtil;
-import org.apache.iceberg.mr.hive.HiveIcebergInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -134,7 +133,8 @@ public class VectorizedReadUtils {
* @param fileSchema - ORC file schema of the input file
* @throws IOException - errors relating to accessing the ORC file
*/
- public static void handleIcebergProjection(FileScanTask task, JobConf job, TypeDescription fileSchema)
+ public static void handleIcebergProjection(FileScanTask task, JobConf job,
+ TypeDescription fileSchema, Expression residual)
throws IOException {
// We need to map with the current (i.e. current Hive table columns) full schema (without projections),
@@ -163,7 +163,6 @@ public class VectorizedReadUtils {
job.set(ColumnProjectionUtils.ORC_SCHEMA_STRING, readOrcSchema.toString());
// Predicate pushdowns needs to be adjusted too in case of column renames, we let Iceberg generate this into job
- Expression residual = HiveIcebergInputFormat.residualForTask(task, job);
Expression boundFilter = Binder.bind(currentSchema.asStruct(), residual, false);
// Note the use of the unshaded version of this class here (required for SARG deseralization later)