You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2022/03/15 19:39:49 UTC

[iceberg] branch master updated: Spark: Backport of #4278 - Fix Overlap Function (#4327)

This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new e9a287b  Spark: Backport of #4278 - Fix Overlap Function (#4327)
e9a287b is described below

commit e9a287b7f68d8e7138c47dcb686d3709bdde402e
Author: Russell Spitzer <rs...@apple.com>
AuthorDate: Tue Mar 15 14:39:30 2022 -0500

    Spark: Backport of #4278 - Fix Overlap Function (#4327)
---
 .../spark/actions/TestRewriteDataFilesAction.java  | 43 +++++++++++++---------
 .../spark/actions/TestRewriteDataFilesAction.java  | 43 +++++++++++++---------
 2 files changed, 52 insertions(+), 34 deletions(-)

diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 06bb70c..a907a80 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -64,6 +64,7 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -838,8 +839,9 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
 
   @Test
   public void testSortCustomSortOrderRequiresRepartition() {
+    int partitions = 4;
     Table table = createTable();
-    writeRecords(20, SCALE, 20);
+    writeRecords(20, SCALE, partitions);
     shouldHaveLastCommitUnsorted(table, "c3");
 
     // Add a partition column so this requires repartitioning
@@ -854,7 +856,7 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
         basicRewrite(table)
             .sort(SortOrder.builderFor(table.schema()).asc("c3").build())
             .option(SortStrategy.REWRITE_ALL, "true")
-            .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table)))
+            .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table) / partitions))
             .execute();
 
     Assert.assertEquals("Should have 1 fileGroups", result.rewriteResults().size(), 1);
@@ -989,39 +991,47 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
 
   protected <T> void shouldHaveLastCommitSorted(Table table, String column) {
     List<Pair<Pair<T, T>, Pair<T, T>>>
-        overlappingFiles = getOverlappingFiles(table, column);
+        overlappingFiles = checkForOverlappingFiles(table, column);
 
     Assert.assertEquals("Found overlapping files", Collections.emptyList(), overlappingFiles);
   }
 
   protected <T> void shouldHaveLastCommitUnsorted(Table table, String column) {
     List<Pair<Pair<T, T>, Pair<T, T>>>
-        overlappingFiles = getOverlappingFiles(table, column);
+        overlappingFiles = checkForOverlappingFiles(table, column);
 
     Assert.assertNotEquals("Found no overlapping files", Collections.emptyList(), overlappingFiles);
   }
 
-  private <T> List<Pair<Pair<T, T>, Pair<T, T>>> getOverlappingFiles(Table table, String column) {
+  private <T> Pair<T, T> boundsOf(DataFile file, NestedField field, Class<T> javaClass) {
+    int columnId = field.fieldId();
+    return Pair.of(javaClass.cast(Conversions.fromByteBuffer(field.type(), file.lowerBounds().get(columnId))),
+        javaClass.cast(Conversions.fromByteBuffer(field.type(), file.upperBounds().get(columnId))));
+  }
+
+  private <T> List<Pair<Pair<T, T>, Pair<T, T>>> checkForOverlappingFiles(Table table, String column) {
     table.refresh();
     NestedField field = table.schema().caseInsensitiveFindField(column);
-    int columnId = field.fieldId();
     Class<T> javaClass = (Class<T>) field.type().typeId().javaClass();
+
     Map<StructLike, List<DataFile>> filesByPartition = Streams.stream(table.currentSnapshot().addedFiles())
         .collect(Collectors.groupingBy(DataFile::partition));
 
     Stream<Pair<Pair<T, T>, Pair<T, T>>> overlaps =
         filesByPartition.entrySet().stream().flatMap(entry -> {
-          List<Pair<T, T>> columnBounds =
-              entry.getValue().stream()
-                  .map(file -> Pair.of(
-                      javaClass.cast(Conversions.fromByteBuffer(field.type(), file.lowerBounds().get(columnId))),
-                      javaClass.cast(Conversions.fromByteBuffer(field.type(), file.upperBounds().get(columnId)))))
-                  .collect(Collectors.toList());
+          List<DataFile> datafiles = entry.getValue();
+          Preconditions.checkArgument(datafiles.size() > 1,
+              "This test is checking for overlaps in a situation where no overlaps can actually occur because the " +
+                  "partition %s does not contain multiple datafiles", entry.getKey());
+
+          List<Pair<Pair<T, T>, Pair<T, T>>> boundComparisons = Lists.cartesianProduct(datafiles, datafiles).stream()
+              .filter(tuple -> tuple.get(0) != tuple.get(1))
+              .map(tuple -> Pair.of(boundsOf(tuple.get(0), field, javaClass), boundsOf(tuple.get(1), field, javaClass)))
+              .collect(Collectors.toList());
 
           Comparator<T> comparator = Comparators.forType(field.type().asPrimitiveType());
 
-          List<Pair<Pair<T, T>, Pair<T, T>>> overlappingFiles = columnBounds.stream()
-              .flatMap(left -> columnBounds.stream().map(right -> Pair.of(left, right)))
+          List<Pair<Pair<T, T>, Pair<T, T>>> overlappingFiles = boundComparisons.stream()
               .filter(filePair -> {
                 Pair<T, T> left = filePair.first();
                 T lMin = left.first();
@@ -1034,9 +1044,8 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
                     (comparator.compare(rMax, lMax) >= 0 && comparator.compare(rMin, lMax) >= 0) ||
                         (comparator.compare(lMax, rMax) >= 0 && comparator.compare(lMin, rMax) >= 0);
 
-                return (left != right) && !boundsDoNotOverlap;
-              })
-              .collect(Collectors.toList());
+                return !boundsDoNotOverlap;
+              }).collect(Collectors.toList());
           return overlappingFiles.stream();
         });
 
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 06bb70c..a907a80 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -64,6 +64,7 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -838,8 +839,9 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
 
   @Test
   public void testSortCustomSortOrderRequiresRepartition() {
+    int partitions = 4;
     Table table = createTable();
-    writeRecords(20, SCALE, 20);
+    writeRecords(20, SCALE, partitions);
     shouldHaveLastCommitUnsorted(table, "c3");
 
     // Add a partition column so this requires repartitioning
@@ -854,7 +856,7 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
         basicRewrite(table)
             .sort(SortOrder.builderFor(table.schema()).asc("c3").build())
             .option(SortStrategy.REWRITE_ALL, "true")
-            .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table)))
+            .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table) / partitions))
             .execute();
 
     Assert.assertEquals("Should have 1 fileGroups", result.rewriteResults().size(), 1);
@@ -989,39 +991,47 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
 
   protected <T> void shouldHaveLastCommitSorted(Table table, String column) {
     List<Pair<Pair<T, T>, Pair<T, T>>>
-        overlappingFiles = getOverlappingFiles(table, column);
+        overlappingFiles = checkForOverlappingFiles(table, column);
 
     Assert.assertEquals("Found overlapping files", Collections.emptyList(), overlappingFiles);
   }
 
   protected <T> void shouldHaveLastCommitUnsorted(Table table, String column) {
     List<Pair<Pair<T, T>, Pair<T, T>>>
-        overlappingFiles = getOverlappingFiles(table, column);
+        overlappingFiles = checkForOverlappingFiles(table, column);
 
     Assert.assertNotEquals("Found no overlapping files", Collections.emptyList(), overlappingFiles);
   }
 
-  private <T> List<Pair<Pair<T, T>, Pair<T, T>>> getOverlappingFiles(Table table, String column) {
+  private <T> Pair<T, T> boundsOf(DataFile file, NestedField field, Class<T> javaClass) {
+    int columnId = field.fieldId();
+    return Pair.of(javaClass.cast(Conversions.fromByteBuffer(field.type(), file.lowerBounds().get(columnId))),
+        javaClass.cast(Conversions.fromByteBuffer(field.type(), file.upperBounds().get(columnId))));
+  }
+
+  private <T> List<Pair<Pair<T, T>, Pair<T, T>>> checkForOverlappingFiles(Table table, String column) {
     table.refresh();
     NestedField field = table.schema().caseInsensitiveFindField(column);
-    int columnId = field.fieldId();
     Class<T> javaClass = (Class<T>) field.type().typeId().javaClass();
+
     Map<StructLike, List<DataFile>> filesByPartition = Streams.stream(table.currentSnapshot().addedFiles())
         .collect(Collectors.groupingBy(DataFile::partition));
 
     Stream<Pair<Pair<T, T>, Pair<T, T>>> overlaps =
         filesByPartition.entrySet().stream().flatMap(entry -> {
-          List<Pair<T, T>> columnBounds =
-              entry.getValue().stream()
-                  .map(file -> Pair.of(
-                      javaClass.cast(Conversions.fromByteBuffer(field.type(), file.lowerBounds().get(columnId))),
-                      javaClass.cast(Conversions.fromByteBuffer(field.type(), file.upperBounds().get(columnId)))))
-                  .collect(Collectors.toList());
+          List<DataFile> datafiles = entry.getValue();
+          Preconditions.checkArgument(datafiles.size() > 1,
+              "This test is checking for overlaps in a situation where no overlaps can actually occur because the " +
+                  "partition %s does not contain multiple datafiles", entry.getKey());
+
+          List<Pair<Pair<T, T>, Pair<T, T>>> boundComparisons = Lists.cartesianProduct(datafiles, datafiles).stream()
+              .filter(tuple -> tuple.get(0) != tuple.get(1))
+              .map(tuple -> Pair.of(boundsOf(tuple.get(0), field, javaClass), boundsOf(tuple.get(1), field, javaClass)))
+              .collect(Collectors.toList());
 
           Comparator<T> comparator = Comparators.forType(field.type().asPrimitiveType());
 
-          List<Pair<Pair<T, T>, Pair<T, T>>> overlappingFiles = columnBounds.stream()
-              .flatMap(left -> columnBounds.stream().map(right -> Pair.of(left, right)))
+          List<Pair<Pair<T, T>, Pair<T, T>>> overlappingFiles = boundComparisons.stream()
               .filter(filePair -> {
                 Pair<T, T> left = filePair.first();
                 T lMin = left.first();
@@ -1034,9 +1044,8 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
                     (comparator.compare(rMax, lMax) >= 0 && comparator.compare(rMin, lMax) >= 0) ||
                         (comparator.compare(lMax, rMax) >= 0 && comparator.compare(lMin, rMax) >= 0);
 
-                return (left != right) && !boundsDoNotOverlap;
-              })
-              .collect(Collectors.toList());
+                return !boundsDoNotOverlap;
+              }).collect(Collectors.toList());
           return overlappingFiles.stream();
         });