You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2022/03/15 19:39:49 UTC
[iceberg] branch master updated: Spark: Backport of #4278 - Fix Overlap Function (#4327)
This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new e9a287b Spark: Backport of #4278 - Fix Overlap Function (#4327)
e9a287b is described below
commit e9a287b7f68d8e7138c47dcb686d3709bdde402e
Author: Russell Spitzer <rs...@apple.com>
AuthorDate: Tue Mar 15 14:39:30 2022 -0500
Spark: Backport of #4278 - Fix Overlap Function (#4327)
---
.../spark/actions/TestRewriteDataFilesAction.java | 43 +++++++++++++---------
.../spark/actions/TestRewriteDataFilesAction.java | 43 +++++++++++++---------
2 files changed, 52 insertions(+), 34 deletions(-)
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 06bb70c..a907a80 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -64,6 +64,7 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -838,8 +839,9 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
@Test
public void testSortCustomSortOrderRequiresRepartition() {
+ int partitions = 4;
Table table = createTable();
- writeRecords(20, SCALE, 20);
+ writeRecords(20, SCALE, partitions);
shouldHaveLastCommitUnsorted(table, "c3");
// Add a partition column so this requires repartitioning
@@ -854,7 +856,7 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
basicRewrite(table)
.sort(SortOrder.builderFor(table.schema()).asc("c3").build())
.option(SortStrategy.REWRITE_ALL, "true")
- .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table)))
+ .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table) / partitions))
.execute();
Assert.assertEquals("Should have 1 fileGroups", result.rewriteResults().size(), 1);
@@ -989,39 +991,47 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
protected <T> void shouldHaveLastCommitSorted(Table table, String column) {
List<Pair<Pair<T, T>, Pair<T, T>>>
- overlappingFiles = getOverlappingFiles(table, column);
+ overlappingFiles = checkForOverlappingFiles(table, column);
Assert.assertEquals("Found overlapping files", Collections.emptyList(), overlappingFiles);
}
protected <T> void shouldHaveLastCommitUnsorted(Table table, String column) {
List<Pair<Pair<T, T>, Pair<T, T>>>
- overlappingFiles = getOverlappingFiles(table, column);
+ overlappingFiles = checkForOverlappingFiles(table, column);
Assert.assertNotEquals("Found no overlapping files", Collections.emptyList(), overlappingFiles);
}
- private <T> List<Pair<Pair<T, T>, Pair<T, T>>> getOverlappingFiles(Table table, String column) {
+ private <T> Pair<T, T> boundsOf(DataFile file, NestedField field, Class<T> javaClass) {
+ int columnId = field.fieldId();
+ return Pair.of(javaClass.cast(Conversions.fromByteBuffer(field.type(), file.lowerBounds().get(columnId))),
+ javaClass.cast(Conversions.fromByteBuffer(field.type(), file.upperBounds().get(columnId))));
+ }
+
+ private <T> List<Pair<Pair<T, T>, Pair<T, T>>> checkForOverlappingFiles(Table table, String column) {
table.refresh();
NestedField field = table.schema().caseInsensitiveFindField(column);
- int columnId = field.fieldId();
Class<T> javaClass = (Class<T>) field.type().typeId().javaClass();
+
Map<StructLike, List<DataFile>> filesByPartition = Streams.stream(table.currentSnapshot().addedFiles())
.collect(Collectors.groupingBy(DataFile::partition));
Stream<Pair<Pair<T, T>, Pair<T, T>>> overlaps =
filesByPartition.entrySet().stream().flatMap(entry -> {
- List<Pair<T, T>> columnBounds =
- entry.getValue().stream()
- .map(file -> Pair.of(
- javaClass.cast(Conversions.fromByteBuffer(field.type(), file.lowerBounds().get(columnId))),
- javaClass.cast(Conversions.fromByteBuffer(field.type(), file.upperBounds().get(columnId)))))
- .collect(Collectors.toList());
+ List<DataFile> datafiles = entry.getValue();
+ Preconditions.checkArgument(datafiles.size() > 1,
+ "This test is checking for overlaps in a situation where no overlaps can actually occur because the " +
+ "partition %s does not contain multiple datafiles", entry.getKey());
+
+ List<Pair<Pair<T, T>, Pair<T, T>>> boundComparisons = Lists.cartesianProduct(datafiles, datafiles).stream()
+ .filter(tuple -> tuple.get(0) != tuple.get(1))
+ .map(tuple -> Pair.of(boundsOf(tuple.get(0), field, javaClass), boundsOf(tuple.get(1), field, javaClass)))
+ .collect(Collectors.toList());
Comparator<T> comparator = Comparators.forType(field.type().asPrimitiveType());
- List<Pair<Pair<T, T>, Pair<T, T>>> overlappingFiles = columnBounds.stream()
- .flatMap(left -> columnBounds.stream().map(right -> Pair.of(left, right)))
+ List<Pair<Pair<T, T>, Pair<T, T>>> overlappingFiles = boundComparisons.stream()
.filter(filePair -> {
Pair<T, T> left = filePair.first();
T lMin = left.first();
@@ -1034,9 +1044,8 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
(comparator.compare(rMax, lMax) >= 0 && comparator.compare(rMin, lMax) >= 0) ||
(comparator.compare(lMax, rMax) >= 0 && comparator.compare(lMin, rMax) >= 0);
- return (left != right) && !boundsDoNotOverlap;
- })
- .collect(Collectors.toList());
+ return !boundsDoNotOverlap;
+ }).collect(Collectors.toList());
return overlappingFiles.stream();
});
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 06bb70c..a907a80 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -64,6 +64,7 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -838,8 +839,9 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
@Test
public void testSortCustomSortOrderRequiresRepartition() {
+ int partitions = 4;
Table table = createTable();
- writeRecords(20, SCALE, 20);
+ writeRecords(20, SCALE, partitions);
shouldHaveLastCommitUnsorted(table, "c3");
// Add a partition column so this requires repartitioning
@@ -854,7 +856,7 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
basicRewrite(table)
.sort(SortOrder.builderFor(table.schema()).asc("c3").build())
.option(SortStrategy.REWRITE_ALL, "true")
- .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table)))
+ .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table) / partitions))
.execute();
Assert.assertEquals("Should have 1 fileGroups", result.rewriteResults().size(), 1);
@@ -989,39 +991,47 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
protected <T> void shouldHaveLastCommitSorted(Table table, String column) {
List<Pair<Pair<T, T>, Pair<T, T>>>
- overlappingFiles = getOverlappingFiles(table, column);
+ overlappingFiles = checkForOverlappingFiles(table, column);
Assert.assertEquals("Found overlapping files", Collections.emptyList(), overlappingFiles);
}
protected <T> void shouldHaveLastCommitUnsorted(Table table, String column) {
List<Pair<Pair<T, T>, Pair<T, T>>>
- overlappingFiles = getOverlappingFiles(table, column);
+ overlappingFiles = checkForOverlappingFiles(table, column);
Assert.assertNotEquals("Found no overlapping files", Collections.emptyList(), overlappingFiles);
}
- private <T> List<Pair<Pair<T, T>, Pair<T, T>>> getOverlappingFiles(Table table, String column) {
+ private <T> Pair<T, T> boundsOf(DataFile file, NestedField field, Class<T> javaClass) {
+ int columnId = field.fieldId();
+ return Pair.of(javaClass.cast(Conversions.fromByteBuffer(field.type(), file.lowerBounds().get(columnId))),
+ javaClass.cast(Conversions.fromByteBuffer(field.type(), file.upperBounds().get(columnId))));
+ }
+
+ private <T> List<Pair<Pair<T, T>, Pair<T, T>>> checkForOverlappingFiles(Table table, String column) {
table.refresh();
NestedField field = table.schema().caseInsensitiveFindField(column);
- int columnId = field.fieldId();
Class<T> javaClass = (Class<T>) field.type().typeId().javaClass();
+
Map<StructLike, List<DataFile>> filesByPartition = Streams.stream(table.currentSnapshot().addedFiles())
.collect(Collectors.groupingBy(DataFile::partition));
Stream<Pair<Pair<T, T>, Pair<T, T>>> overlaps =
filesByPartition.entrySet().stream().flatMap(entry -> {
- List<Pair<T, T>> columnBounds =
- entry.getValue().stream()
- .map(file -> Pair.of(
- javaClass.cast(Conversions.fromByteBuffer(field.type(), file.lowerBounds().get(columnId))),
- javaClass.cast(Conversions.fromByteBuffer(field.type(), file.upperBounds().get(columnId)))))
- .collect(Collectors.toList());
+ List<DataFile> datafiles = entry.getValue();
+ Preconditions.checkArgument(datafiles.size() > 1,
+ "This test is checking for overlaps in a situation where no overlaps can actually occur because the " +
+ "partition %s does not contain multiple datafiles", entry.getKey());
+
+ List<Pair<Pair<T, T>, Pair<T, T>>> boundComparisons = Lists.cartesianProduct(datafiles, datafiles).stream()
+ .filter(tuple -> tuple.get(0) != tuple.get(1))
+ .map(tuple -> Pair.of(boundsOf(tuple.get(0), field, javaClass), boundsOf(tuple.get(1), field, javaClass)))
+ .collect(Collectors.toList());
Comparator<T> comparator = Comparators.forType(field.type().asPrimitiveType());
- List<Pair<Pair<T, T>, Pair<T, T>>> overlappingFiles = columnBounds.stream()
- .flatMap(left -> columnBounds.stream().map(right -> Pair.of(left, right)))
+ List<Pair<Pair<T, T>, Pair<T, T>>> overlappingFiles = boundComparisons.stream()
.filter(filePair -> {
Pair<T, T> left = filePair.first();
T lMin = left.first();
@@ -1034,9 +1044,8 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
(comparator.compare(rMax, lMax) >= 0 && comparator.compare(rMin, lMax) >= 0) ||
(comparator.compare(lMax, rMax) >= 0 && comparator.compare(lMin, rMax) >= 0);
- return (left != right) && !boundsDoNotOverlap;
- })
- .collect(Collectors.toList());
+ return !boundsDoNotOverlap;
+ }).collect(Collectors.toList());
return overlappingFiles.stream();
});