You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by rd...@apache.org on 2020/04/20 18:33:16 UTC

[incubator-iceberg] branch master updated: Add residual evaluation for MR reader (#931)

This is an automated email from the ASF dual-hosted git repository.

rdsr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new af25cbe  Add residual evaluation for MR reader (#931)
af25cbe is described below

commit af25cbedac34c09633a66a2c73d68f3f4822d64a
Author: Chen, Junjie <ch...@gmail.com>
AuthorDate: Tue Apr 21 02:33:04 2020 +0800

    Add residual evaluation for MR reader (#931)
---
 .../iceberg/mr/mapreduce/IcebergInputFormat.java   | 28 ++++++---
 .../mr/mapreduce/TestIcebergInputFormat.java       | 73 +++++++++++++++++++---
 2 files changed, 84 insertions(+), 17 deletions(-)

diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 4f31abc..0d35644 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -62,6 +62,7 @@ import org.apache.iceberg.data.avro.DataReader;
 import org.apache.iceberg.data.orc.GenericOrcReader;
 import org.apache.iceberg.data.parquet.GenericParquetReaders;
 import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Evaluator;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hadoop.HadoopInputFile;
@@ -243,11 +244,12 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
     }
 
     splits = Lists.newArrayList();
-    boolean applyResidualFiltering = !conf.getBoolean(SKIP_RESIDUAL_FILTERING, false);
+    boolean applyResidual = !conf.getBoolean(SKIP_RESIDUAL_FILTERING, false);
+    InMemoryDataModel model = conf.getEnum(IN_MEMORY_DATA_MODEL, InMemoryDataModel.GENERIC);
     try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
       tasksIterable.forEach(task -> {
-        if (applyResidualFiltering) {
-          //TODO: We do not support residual evaluation yet
+        if (applyResidual && (model == InMemoryDataModel.HIVE || model == InMemoryDataModel.PIG)) {
+          //TODO: We do not support residual evaluation for HIVE and PIG in memory data model yet
           checkResiduals(task);
         }
         splits.add(new IcebergSplit(conf, task));
@@ -316,6 +318,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
           currentCloseable.close();
           currentIterator = open(tasks.next());
         } else {
+          currentCloseable.close();
           return false;
         }
       }
@@ -394,7 +397,6 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
               String.format("Cannot read %s file: %s", file.format().name(), file.path()));
       }
       currentCloseable = iterable;
-      //TODO: Apply residual filtering before returning the iterator
       return iterable.iterator();
     }
 
@@ -441,6 +443,18 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       return row;
     }
 
+    private CloseableIterable<T> applyResidualFiltering(CloseableIterable<T> iter, Expression residual,
+                                                        Schema readSchema) {
+      boolean applyResidual = !context.getConfiguration().getBoolean(SKIP_RESIDUAL_FILTERING, false);
+
+      if (applyResidual && residual != null && residual != Expressions.alwaysTrue()) {
+        Evaluator filter = new Evaluator(readSchema.asStruct(), residual, caseSensitive);
+        return CloseableIterable.filter(iter, record -> filter.eval((StructLike) record));
+      } else {
+        return iter;
+      }
+    }
+
     private CloseableIterable<T> newAvroIterable(InputFile inputFile, FileScanTask task, Schema readSchema) {
       Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile)
           .project(readSchema)
@@ -457,7 +471,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
         case GENERIC:
           avroReadBuilder.createReaderFunc(DataReader::create);
       }
-      return avroReadBuilder.build();
+      return applyResidualFiltering(avroReadBuilder.build(), task.residual(), readSchema);
     }
 
     private CloseableIterable<T> newParquetIterable(InputFile inputFile, FileScanTask task, Schema readSchema) {
@@ -479,7 +493,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
           parquetReadBuilder.createReaderFunc(
               fileSchema -> GenericParquetReaders.buildReader(readSchema, fileSchema));
       }
-      return parquetReadBuilder.build();
+      return applyResidualFiltering(parquetReadBuilder.build(), task.residual(), readSchema);
     }
 
     private CloseableIterable<T> newOrcIterable(InputFile inputFile, FileScanTask task, Schema readSchema) {
@@ -497,7 +511,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
           orcReadBuilder.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(readSchema, fileSchema));
       }
 
-      return orcReadBuilder.build();
+      return applyResidualFiltering(orcReadBuilder.build(), task.residual(), readSchema);
     }
   }
 
diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java b/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java
index 1f84890..36be820 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java
@@ -184,24 +184,77 @@ public class TestIcebergInputFormat {
     Table table = tables.create(SCHEMA, SPEC,
                                 ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
                                 location.toString());
-    List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 2, 0L);
-    expectedRecords.get(0).set(2, "2020-03-20");
-    expectedRecords.get(1).set(2, "2020-03-20");
-    DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format, expectedRecords);
+    List<Record> writeRecords = RandomGenericData.generate(table.schema(), 2, 0L);
+    writeRecords.get(0).set(1, 123L);
+    writeRecords.get(0).set(2, "2020-03-20");
+    writeRecords.get(1).set(1, 456L);
+    writeRecords.get(1).set(2, "2020-03-20");
+
+    List<Record> expectedRecords = new ArrayList<>();
+    expectedRecords.add(writeRecords.get(0));
+
+    DataFile dataFile1 = writeFile(table, Row.of("2020-03-20", 0), format, writeRecords);
+    DataFile dataFile2 = writeFile(table, Row.of("2020-03-21", 0), format,
+        RandomGenericData.generate(table.schema(), 2, 0L));
     table.newAppend()
-         .appendFile(dataFile)
+         .appendFile(dataFile1)
+         .appendFile(dataFile2)
          .commit();
     Job job = Job.getInstance(conf);
     IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(job);
     configBuilder.readFrom(location.toString())
-                 .filter(Expressions.and(
-                     Expressions.equal("date", "2020-03-20"),
-                     Expressions.equal("id", 0)));
+        .filter(Expressions.and(
+            Expressions.equal("date", "2020-03-20"),
+            Expressions.equal("id", 123)));
+    validate(job, expectedRecords);
+
+    // skip residual filtering
+    job = Job.getInstance(conf);
+    configBuilder = IcebergInputFormat.configure(job);
+    configBuilder.skipResidualFiltering().readFrom(location.toString())
+        .filter(Expressions.and(
+            Expressions.equal("date", "2020-03-20"),
+            Expressions.equal("id", 123)));
+    validate(job, writeRecords);
+  }
 
+  @Test
+  public void testFailedResidualFiltering() throws Exception {
+    File location = temp.newFolder(format.name());
+    Assert.assertTrue(location.delete());
+    Table table = tables.create(SCHEMA, SPEC,
+        ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
+        location.toString());
+    List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 2, 0L);
+    expectedRecords.get(0).set(2, "2020-03-20");
+    expectedRecords.get(1).set(2, "2020-03-20");
+
+    DataFile dataFile1 = writeFile(table, Row.of("2020-03-20", 0), format, expectedRecords);
+    table.newAppend()
+        .appendFile(dataFile1)
+        .commit();
+
+    Job jobShouldFail1 = Job.getInstance(conf);
+    IcebergInputFormat.ConfigBuilder configBuilder = IcebergInputFormat.configure(jobShouldFail1);
+    configBuilder.useHiveRows().readFrom(location.toString())
+        .filter(Expressions.and(
+            Expressions.equal("date", "2020-03-20"),
+            Expressions.equal("id", 0)));
+    AssertHelpers.assertThrows(
+        "Residuals are not evaluated today for Iceberg Generics In memory model of HIVE",
+        UnsupportedOperationException.class, "Filter expression ref(name=\"id\") == 0 is not completely satisfied.",
+        () -> validate(jobShouldFail1, expectedRecords));
+
+    Job jobShouldFail2 = Job.getInstance(conf);
+    configBuilder = IcebergInputFormat.configure(jobShouldFail2);
+    configBuilder.usePigTuples().readFrom(location.toString())
+        .filter(Expressions.and(
+            Expressions.equal("date", "2020-03-20"),
+            Expressions.equal("id", 0)));
     AssertHelpers.assertThrows(
-        "Residuals are not evaluated today for Iceberg Generics In memory model",
+        "Residuals are not evaluated today for Iceberg Generics In memory model of PIG",
         UnsupportedOperationException.class, "Filter expression ref(name=\"id\") == 0 is not completely satisfied.",
-        () -> validate(job, expectedRecords));
+        () -> validate(jobShouldFail2, expectedRecords));
   }
 
   @Test