You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2022/04/20 15:47:07 UTC
[hive] branch master updated: HIVE-26137: Optimized transfer of Iceberg residual expressions from AM to execution (#3203) (Adam Szita, reviewed by Marton Bod)
This is an automated email from the ASF dual-hosted git repository.
szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 6b5e57e1b3b HIVE-26137: Optimized transfer of Iceberg residual expressions from AM to execution (#3203) (Adam Szita, reviewed by Marton Bod)
6b5e57e1b3b is described below
commit 6b5e57e1b3b2419335d890ca8ad86fd520b36f2b
Author: Adam Szita <40...@users.noreply.github.com>
AuthorDate: Wed Apr 20 17:46:55 2022 +0200
HIVE-26137: Optimized transfer of Iceberg residual expressions from AM to execution (#3203) (Adam Szita, reviewed by Marton Bod)
---
.../iceberg/mr/hive/HiveIcebergInputFormat.java | 53 ++++++++++++---
.../apache/iceberg/mr/hive/HiveIcebergSplit.java | 41 ------------
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 21 ++++--
.../apache/iceberg/orc/VectorizedReadUtils.java | 20 +++---
.../mr/{hive => }/TestIcebergInputFormats.java | 76 +---------------------
.../iceberg/mr/TestInputFormatReaderDeletes.java | 1 -
6 files changed, 71 insertions(+), 141 deletions(-)
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
index 57f5d48c223..38c75fad949 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
@@ -41,9 +41,12 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.hive.MetastoreUtil;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.mapred.AbstractMapredIcebergRecordReader;
@@ -80,21 +83,55 @@ public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
}
}
- @Override
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- // Convert Hive filter to Iceberg filter
- String hiveFilter = job.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+ /**
+ * Converts the Hive filter found in the job conf to an Iceberg filter expression.
+ * @param conf - job conf
+ * @return - Iceberg data filter expression
+ */
+ static Expression icebergDataFilterFromHiveConf(Configuration conf) {
+ Expression icebergFilter = SerializationUtil.deserializeFromBase64(conf.get(InputFormatConfig.FILTER_EXPRESSION));
+ if (icebergFilter != null) {
+ // in case we already have it prepared..
+ return icebergFilter;
+ }
+ String hiveFilter = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
if (hiveFilter != null) {
ExprNodeGenericFuncDesc exprNodeDesc = SerializationUtilities
- .deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class);
- SearchArgument sarg = ConvertAstToSearchArg.create(job, exprNodeDesc);
+ .deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class);
+ SearchArgument sarg = ConvertAstToSearchArg.create(conf, exprNodeDesc);
try {
- Expression filter = HiveIcebergFilterFactory.generateFilterExpression(sarg);
- job.set(InputFormatConfig.FILTER_EXPRESSION, SerializationUtil.serializeToBase64(filter));
+ return HiveIcebergFilterFactory.generateFilterExpression(sarg);
} catch (UnsupportedOperationException e) {
LOG.warn("Unable to create Iceberg filter, continuing without filter (will be applied by Hive later): ", e);
}
}
+ return null;
+ }
+
+ /**
+ * Converts Hive filter found in the passed job conf to an Iceberg filter expression. Then evaluates this
+ * against the task's partition value producing a residual filter expression.
+ * @param task - file scan task to evaluate the expression against
+ * @param conf - job conf
+ * @return - Iceberg residual filter expression
+ */
+ public static Expression residualForTask(FileScanTask task, Configuration conf) {
+ Expression dataFilter = icebergDataFilterFromHiveConf(conf);
+ if (dataFilter == null) {
+ return Expressions.alwaysTrue();
+ }
+ return ResidualEvaluator.of(
+ task.spec(), dataFilter,
+ conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT)
+ ).residualFor(task.file().partition());
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ Expression filter = icebergDataFilterFromHiveConf(job);
+ if (filter != null) {
+ job.set(InputFormatConfig.FILTER_EXPRESSION, SerializationUtil.serializeToBase64(filter));
+ }
job.set(InputFormatConfig.SELECTED_COLUMNS, job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, ""));
job.set(InputFormatConfig.AS_OF_TIMESTAMP, job.get(TableScanDesc.AS_OF_TIMESTAMP, "-1"));
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
index b698d869d30..a9827ffb976 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
@@ -28,10 +28,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.tez.HashableInputSplit;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.common.DynClasses;
-import org.apache.iceberg.common.DynFields;
-import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.mr.mapreduce.IcebergSplit;
import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
import org.apache.iceberg.relocated.com.google.common.primitives.Longs;
@@ -97,45 +93,8 @@ public class HiveIcebergSplit extends FileSplit implements IcebergSplitContainer
return 0;
}
- /**
- * This hack removes residual expressions from the file scan task just before split serialization.
- * Residuals can sometime take up too much space in the payload causing Tez AM to OOM.
- * Unfortunately Tez AM doesn't distribute splits in a streamed way, that is, it serializes all splits for a job
- * before sending them out to executors. Some residuals may take ~ 1 MB in memory, multiplied with thousands of splits
- * could kill the Tez AM JVM.
- * Until the streamed split distribution is implemented we will kick residuals out of the split, essentially the
- * executor side won't use it anyway (yet).
- */
- private static final Class<?> SPLIT_SCAN_TASK_CLAZZ;
- private static final DynFields.UnboundField<Object> FILE_SCAN_TASK_FIELD;
- private static final DynFields.UnboundField<Object> RESIDUALS_FIELD;
- private static final DynFields.UnboundField<Object> EXPR_FIELD;
- private static final DynFields.UnboundField<Object> UNPARTITIONED_EXPR_FIELD;
-
- static {
- SPLIT_SCAN_TASK_CLAZZ = DynClasses.builder().impl("org.apache.iceberg.BaseFileScanTask$SplitScanTask").build();
- FILE_SCAN_TASK_FIELD = DynFields.builder().hiddenImpl(SPLIT_SCAN_TASK_CLAZZ, "fileScanTask").build();
- RESIDUALS_FIELD = DynFields.builder().hiddenImpl("org.apache.iceberg.BaseFileScanTask", "residuals").build();
- EXPR_FIELD = DynFields.builder().hiddenImpl(ResidualEvaluator.class, "expr").build();
- UNPARTITIONED_EXPR_FIELD = DynFields.builder().hiddenImpl("org.apache.iceberg.expressions." +
- "ResidualEvaluator$UnpartitionedResidualEvaluator", "expr").build();
- }
-
@Override
public void write(DataOutput out) throws IOException {
- for (FileScanTask fileScanTask : icebergSplit().task().files()) {
- if (fileScanTask.getClass().isAssignableFrom(SPLIT_SCAN_TASK_CLAZZ)) {
-
- Object residuals = RESIDUALS_FIELD.get(FILE_SCAN_TASK_FIELD.get(fileScanTask));
-
- if (fileScanTask.spec().isPartitioned()) {
- EXPR_FIELD.set(residuals, Expressions.alwaysTrue());
- } else {
- UNPARTITIONED_EXPR_FIELD.set(residuals, Expressions.alwaysTrue());
- }
-
- }
- }
byte[] bytes = SerializationUtil.serializeToBytes(tableLocation);
out.writeInt(bytes.length);
out.write(bytes);
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 2ffb1539512..085491ffa82 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -77,6 +77,7 @@ import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.hive.HiveIcebergInputFormat;
import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
import org.apache.iceberg.mr.hive.IcebergAcidUtil;
import org.apache.iceberg.orc.ORC;
@@ -133,7 +134,6 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
Long openFileCost = splitSize > 0 ? splitSize : TableProperties.SPLIT_SIZE_DEFAULT;
scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, String.valueOf(openFileCost));
}
-
String schemaStr = conf.get(InputFormatConfig.READ_SCHEMA);
if (schemaStr != null) {
scan.project(SchemaParser.fromJson(schemaStr));
@@ -147,7 +147,10 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
// TODO add a filter parser to get rid of Serialization
Expression filter = SerializationUtil.deserializeFromBase64(conf.get(InputFormatConfig.FILTER_EXPRESSION));
if (filter != null) {
- scan = scan.filter(filter);
+ // In order to prevent the filter expression to be attached to every file scan task generated we call
+ // ignoreResiduals() here. The passed in filter will still be effective during split generation.
+ // On the execution side residual expressions will be mined from the passed job conf.
+ scan = scan.filter(filter).ignoreResiduals();
}
return scan;
@@ -374,6 +377,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
private CloseableIterable<T> newAvroIterable(
InputFile inputFile, FileScanTask task, Schema readSchema) {
+ Expression residual = HiveIcebergInputFormat.residualForTask(task, context.getConfiguration());
Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile)
.project(readSchema)
.split(task.start(), task.length());
@@ -396,11 +400,13 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
DataReader.create(expIcebergSchema, expAvroSchema,
constantsMap(task, IdentityPartitionConverters::convertConstant)));
}
- return applyResidualFiltering(avroReadBuilder.build(), task.residual(), readSchema);
+ return applyResidualFiltering(avroReadBuilder.build(), residual, readSchema);
}
private CloseableIterable<T> newParquetIterable(InputFile inputFile, FileScanTask task, Schema readSchema) {
Map<Integer, ?> idToConstant = constantsMap(task, IdentityPartitionConverters::convertConstant);
+ Expression residual = HiveIcebergInputFormat.residualForTask(task, context.getConfiguration());
+
CloseableIterable<T> parquetIterator = null;
switch (inMemoryDataModel) {
@@ -416,7 +422,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
case GENERIC:
Parquet.ReadBuilder parquetReadBuilder = Parquet.read(inputFile)
.project(readSchema)
- .filter(task.residual())
+ .filter(residual)
.caseSensitive(caseSensitive)
.split(task.start(), task.length());
if (reuseContainers) {
@@ -431,12 +437,13 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
parquetIterator = parquetReadBuilder.build();
}
- return applyResidualFiltering(parquetIterator, task.residual(), readSchema);
+ return applyResidualFiltering(parquetIterator, residual, readSchema);
}
private CloseableIterable<T> newOrcIterable(InputFile inputFile, FileScanTask task, Schema readSchema) {
Map<Integer, ?> idToConstant = constantsMap(task, IdentityPartitionConverters::convertConstant);
Schema readSchemaWithoutConstantAndMetadataFields = schemaWithoutConstantsAndMeta(readSchema, idToConstant);
+ Expression residual = HiveIcebergInputFormat.residualForTask(task, context.getConfiguration());
CloseableIterable<T> orcIterator = null;
// ORC does not support reuse containers yet
@@ -454,7 +461,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
case GENERIC:
ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile)
.project(readSchemaWithoutConstantAndMetadataFields)
- .filter(task.residual())
+ .filter(residual)
.caseSensitive(caseSensitive)
.split(task.start(), task.length());
orcReadBuilder.createReaderFunc(
@@ -467,7 +474,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
orcIterator = orcReadBuilder.build();
}
- return applyResidualFiltering(orcIterator, task.residual(), readSchema);
+ return applyResidualFiltering(orcIterator, residual, readSchema);
}
private Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type, Object, Object> converter) {
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
index 2f6b3abf289..17a05a4bcc5 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
@@ -44,6 +44,7 @@ import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mr.hive.HiveIcebergInputFormat;
import org.apache.orc.impl.BufferChunk;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -160,18 +161,17 @@ public class VectorizedReadUtils {
job.set(ColumnProjectionUtils.ORC_SCHEMA_STRING, readOrcSchema.toString());
// Predicate pushdowns needs to be adjusted too in case of column renames, we let Iceberg generate this into job
- if (task.residual() != null) {
- Expression boundFilter = Binder.bind(currentSchema.asStruct(), task.residual(), false);
+ Expression residual = HiveIcebergInputFormat.residualForTask(task, job);
+ Expression boundFilter = Binder.bind(currentSchema.asStruct(), residual, false);
- // Note the use of the unshaded version of this class here (required for SARG deseralization later)
- org.apache.hadoop.hive.ql.io.sarg.SearchArgument sarg =
- ExpressionToOrcSearchArgument.convert(boundFilter, readOrcSchema);
- if (sarg != null) {
- job.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
- job.unset(ConvertAstToSearchArg.SARG_PUSHDOWN);
+ // Note the use of the unshaded version of this class here (required for SARG deseralization later)
+ org.apache.hadoop.hive.ql.io.sarg.SearchArgument sarg =
+ ExpressionToOrcSearchArgument.convert(boundFilter, readOrcSchema);
+ if (sarg != null) {
+ job.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
+ job.unset(ConvertAstToSearchArg.SARG_PUSHDOWN);
- job.set(ConvertAstToSearchArg.SARG_PUSHDOWN, ConvertAstToSearchArg.sargToKryo(sarg));
- }
+ job.set(ConvertAstToSearchArg.SARG_PUSHDOWN, ConvertAstToSearchArg.sargToKryo(sarg));
}
}
}
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestIcebergInputFormats.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
similarity index 86%
rename from iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestIcebergInputFormats.java
rename to iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
index ff45cf8521a..18e15d45e87 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestIcebergInputFormats.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
@@ -17,12 +17,8 @@
* under the License.
*/
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -44,7 +40,6 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DataFile;
@@ -59,9 +54,7 @@ import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.mr.Catalogs;
-import org.apache.iceberg.mr.InputFormatConfig;
-import org.apache.iceberg.mr.TestHelper;
+import org.apache.iceberg.mr.hive.HiveIcebergInputFormat;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
import org.apache.iceberg.mr.mapreduce.IcebergInputFormat;
@@ -81,9 +74,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.iceberg.types.Types.NestedField.required;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
@@ -212,34 +203,6 @@ public class TestIcebergInputFormats {
testInputFormat.create(builder.conf()).validate(writeRecords);
}
- @Test
- public void testFailedResidualFiltering() throws Exception {
- helper.createTable();
-
- List<Record> expectedRecords = helper.generateRandomRecords(2, 0L);
- expectedRecords.get(0).set(2, "2020-03-20");
- expectedRecords.get(1).set(2, "2020-03-20");
-
- helper.appendToTable(Row.of("2020-03-20", 0), expectedRecords);
-
- builder.useHiveRows()
- .filter(Expressions.and(
- Expressions.equal("date", "2020-03-20"),
- Expressions.equal("id", 0)));
-
- AssertHelpers.assertThrows(
- "Residuals are not evaluated today for Iceberg Generics In memory model of HIVE",
- UnsupportedOperationException.class, "Filter expression ref(name=\"id\") == 0 is not completely satisfied.",
- () -> testInputFormat.create(builder.conf()));
-
- builder.usePigTuples();
-
- AssertHelpers.assertThrows(
- "Residuals are not evaluated today for Iceberg Generics In memory model of PIG",
- UnsupportedOperationException.class, "Filter expression ref(name=\"id\") == 0 is not completely satisfied.",
- () -> testInputFormat.create(builder.conf()));
- }
-
@Test
public void testProjection() throws Exception {
helper.createTable();
@@ -409,41 +372,6 @@ public class TestIcebergInputFormats {
mapWork.getCacheAffinity());
}
- @Test
- public void testResidualsUnserialized() throws Exception {
- helper.createUnpartitionedTable();
- List<Record> expectedRecords = helper.generateRandomRecords(10, 0L);
- helper.appendToTable(null, expectedRecords);
- builder.filter(Expressions.greaterThan("id", 123));
-
- for (InputSplit split : testInputFormat.create(builder.conf()).getSplits()) {
-
- HiveIcebergSplit originalSplit = new HiveIcebergSplit((IcebergSplit) split, "noop");
-
- // In the original split, residual should still be there as per above expression
- assertNotEquals(
- Expressions.alwaysTrue(),
- originalSplit.icebergSplit().task().files().stream().findFirst().get().residual()
- );
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(baos);
- originalSplit.write(out);
-
- HiveIcebergSplit deserializedSplit = new HiveIcebergSplit();
- ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
- DataInputStream in = new DataInputStream(bais);
- deserializedSplit.readFields(in);
-
- // After ser/de the expression should be always-true
- assertEquals(
- Expressions.alwaysTrue(),
- deserializedSplit.icebergSplit().task().files().stream().findFirst().get().residual()
- );
- }
-
- }
-
// TODO - Capture template type T in toString method: https://github.com/apache/iceberg/issues/1542
public abstract static class TestInputFormat<T> {
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
index 3d09e7c9e83..2ba4e50e8aa 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
@@ -34,7 +34,6 @@ import org.apache.iceberg.TableOperations;
import org.apache.iceberg.data.DeleteReadTests;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.mr.hive.TestIcebergInputFormats;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.Assert;
import org.junit.Before;