You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by rd...@apache.org on 2020/04/20 18:33:16 UTC
[incubator-iceberg] branch master updated: Add residual evaluation
for MR reader (#931)
This is an automated email from the ASF dual-hosted git repository.
rdsr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new af25cbe Add residual evaluation for MR reader (#931)
af25cbe is described below
commit af25cbedac34c09633a66a2c73d68f3f4822d64a
Author: Chen, Junjie <ch...@gmail.com>
AuthorDate: Tue Apr 21 02:33:04 2020 +0800
Add residual evaluation for MR reader (#931)
---
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 28 ++++++---
.../mr/mapreduce/TestIcebergInputFormat.java | 73 +++++++++++++++++++---
2 files changed, 84 insertions(+), 17 deletions(-)
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 4f31abc..0d35644 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -62,6 +62,7 @@ import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopInputFile;
@@ -243,11 +244,12 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
}
splits = Lists.newArrayList();
- boolean applyResidualFiltering = !conf.getBoolean(SKIP_RESIDUAL_FILTERING, false);
+ boolean applyResidual = !conf.getBoolean(SKIP_RESIDUAL_FILTERING, false);
+ InMemoryDataModel model = conf.getEnum(IN_MEMORY_DATA_MODEL, InMemoryDataModel.GENERIC);
try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
tasksIterable.forEach(task -> {
- if (applyResidualFiltering) {
- //TODO: We do not support residual evaluation yet
+ if (applyResidual && (model == InMemoryDataModel.HIVE || model == InMemoryDataModel.PIG)) {
+ //TODO: We do not support residual evaluation for HIVE and PIG in memory data model yet
checkResiduals(task);
}
splits.add(new IcebergSplit(conf, task));
@@ -316,6 +318,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
currentCloseable.close();
currentIterator = open(tasks.next());
} else {
+ currentCloseable.close();
return false;
}
}
@@ -394,7 +397,6 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
String.format("Cannot read %s file: %s", file.format().name(), file.path()));
}
currentCloseable = iterable;
- //TODO: Apply residual filtering before returning the iterator
return iterable.iterator();
}
@@ -441,6 +443,18 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
return row;
}
+ private CloseableIterable<T> applyResidualFiltering(CloseableIterable<T> iter, Expression residual,
+ Schema readSchema) {
+ boolean applyResidual = !context.getConfiguration().getBoolean(SKIP_RESIDUAL_FILTERING, false);
+
+ if (applyResidual && residual != null && residual != Expressions.alwaysTrue()) {
+ Evaluator filter = new Evaluator(readSchema.asStruct(), residual, caseSensitive);
+ return CloseableIterable.filter(iter, record -> filter.eval((StructLike) record));
+ } else {
+ return iter;
+ }
+ }
+
private CloseableIterable<T> newAvroIterable(InputFile inputFile, FileScanTask task, Schema readSchema) {
Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile)
.project(readSchema)
@@ -457,7 +471,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
case GENERIC:
avroReadBuilder.createReaderFunc(DataReader::create);
}
- return avroReadBuilder.build();
+ return applyResidualFiltering(avroReadBuilder.build(), task.residual(), readSchema);
}
private CloseableIterable<T> newParquetIterable(InputFile inputFile, FileScanTask task, Schema readSchema) {
@@ -479,7 +493,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
parquetReadBuilder.createReaderFunc(
fileSchema -> GenericParquetReaders.buildReader(readSchema, fileSchema));
}
- return parquetReadBuilder.build();
+ return applyResidualFiltering(parquetReadBuilder.build(), task.residual(), readSchema);
}
private CloseableIterable<T> newOrcIterable(InputFile inputFile, FileScanTask task, Schema readSchema) {
@@ -497,7 +511,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
orcReadBuilder.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(readSchema, fileSchema));
}
- return orcReadBuilder.build();
+ return applyResidualFiltering(orcReadBuilder.build(), task.residual(), readSchema);
}
}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java b/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java
index 1f84890..36be820 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java
@@ -184,24 +184,77 @@ public class TestIcebergInputFormat {
Table table = tables.create(SCHEMA, SPEC,
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
location.toString());
- List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 2, 0L);
- expectedRecords.get(0).set(2, "2020-03-20");
- expectedRecords.get(1).set(2, "2020-03-20");
- DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format, expectedRecords);
+ List<Record> writeRecords = RandomGenericData.generate(table.schema(), 2, 0L);
+ writeRecords.get(0).set(1, 123L);
+ writeRecords.get(0).set(2, "2020-03-20");
+ writeRecords.get(1).set(1, 456L);
+ writeRecords.get(1).set(2, "2020-03-20");
+
+ List<Record> expectedRecords = new ArrayList<>();
+ expectedRecords.add(writeRecords.get(0));
+
+ DataFile dataFile1 = writeFile(table, Row.of("2020-03-20", 0), format, writeRecords);
+ DataFile dataFile2 = writeFile(table, Row.of("2020-03-21", 0), format,
+ RandomGenericData.generate(table.schema(), 2, 0L));
table.newAppend()
- .appendFile(dataFile)
+ .appendFile(dataFile1)
+ .appendFile(dataFile2)
.commit();
Job job = Job.getInstance(conf);
IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(job);
configBuilder.readFrom(location.toString())
- .filter(Expressions.and(
- Expressions.equal("date", "2020-03-20"),
- Expressions.equal("id", 0)));
+ .filter(Expressions.and(
+ Expressions.equal("date", "2020-03-20"),
+ Expressions.equal("id", 123)));
+ validate(job, expectedRecords);
+
+ // skip residual filtering
+ job = Job.getInstance(conf);
+ configBuilder = IcebergInputFormat.configure(job);
+ configBuilder.skipResidualFiltering().readFrom(location.toString())
+ .filter(Expressions.and(
+ Expressions.equal("date", "2020-03-20"),
+ Expressions.equal("id", 123)));
+ validate(job, writeRecords);
+ }
+ @Test
+ public void testFailedResidualFiltering() throws Exception {
+ File location = temp.newFolder(format.name());
+ Assert.assertTrue(location.delete());
+ Table table = tables.create(SCHEMA, SPEC,
+ ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
+ location.toString());
+ List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 2, 0L);
+ expectedRecords.get(0).set(2, "2020-03-20");
+ expectedRecords.get(1).set(2, "2020-03-20");
+
+ DataFile dataFile1 = writeFile(table, Row.of("2020-03-20", 0), format, expectedRecords);
+ table.newAppend()
+ .appendFile(dataFile1)
+ .commit();
+
+ Job jobShouldFail1 = Job.getInstance(conf);
+ IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(jobShouldFail1);
+ configBuilder.useHiveRows().readFrom(location.toString())
+ .filter(Expressions.and(
+ Expressions.equal("date", "2020-03-20"),
+ Expressions.equal("id", 0)));
+ AssertHelpers.assertThrows(
+ "Residuals are not evaluated today for Iceberg Generics In memory model of HIVE",
+ UnsupportedOperationException.class, "Filter expression ref(name=\"id\") == 0 is not completely satisfied.",
+ () -> validate(jobShouldFail1, expectedRecords));
+
+ Job jobShouldFail2 = Job.getInstance(conf);
+ configBuilder = IcebergInputFormat.configure(jobShouldFail2);
+ configBuilder.usePigTuples().readFrom(location.toString())
+ .filter(Expressions.and(
+ Expressions.equal("date", "2020-03-20"),
+ Expressions.equal("id", 0)));
AssertHelpers.assertThrows(
- "Residuals are not evaluated today for Iceberg Generics In memory model",
+ "Residuals are not evaluated today for Iceberg Generics In memory model of PIG",
UnsupportedOperationException.class, "Filter expression ref(name=\"id\") == 0 is not completely satisfied.",
- () -> validate(job, expectedRecords));
+ () -> validate(jobShouldFail2, expectedRecords));
}
@Test