You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/05/17 15:44:58 UTC
[iceberg] branch master updated: Spark: Disable aggregate pushdown for incremental scan (#7626)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 9d7ef2e9dd Spark: Disable aggregate pushdown for incremental scan (#7626)
9d7ef2e9dd is described below
commit 9d7ef2e9dd27375c2b8bd7a143e4f623118185f8
Author: Huaxin Gao <hu...@apple.com>
AuthorDate: Wed May 17 08:44:51 2023 -0700
Spark: Disable aggregate pushdown for incremental scan (#7626)
---
.../java/org/apache/iceberg/spark/source/SparkScanBuilder.java | 5 +++++
.../org/apache/iceberg/spark/source/TestDataSourceOptions.java | 10 +++++-----
.../java/org/apache/iceberg/spark/source/SparkScanBuilder.java | 5 +++++
.../org/apache/iceberg/spark/source/TestDataSourceOptions.java | 10 +++++-----
4 files changed, 20 insertions(+), 10 deletions(-)
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 23cd8524b3..ddeec9c494 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -271,6 +271,11 @@ public class SparkScanBuilder
return false;
}
+ if (readConf.startSnapshotId() != null) {
+ LOG.info("Skipping aggregate pushdown: incremental scan is not supported");
+ return false;
+ }
+
// If group by expression is the same as the partition, the statistics information can still
// be used to calculate min/max/count, will enable aggregate push down in next phase.
// TODO: enable aggregate push down for partition col group by expression
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 60dd716c63..9f4eab5bb9 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -300,17 +300,17 @@ public class TestDataSourceOptions {
Assert.assertEquals("Records should match", expectedRecords.subList(1, 4), result);
// test (2nd snapshot, 3rd snapshot] incremental scan.
- List<SimpleRecord> result1 =
+ Dataset<Row> resultDf =
spark
.read()
.format("iceberg")
.option("start-snapshot-id", snapshotIds.get(2).toString())
.option("end-snapshot-id", snapshotIds.get(1).toString())
- .load(tableLocation)
- .orderBy("id")
- .as(Encoders.bean(SimpleRecord.class))
- .collectAsList();
+ .load(tableLocation);
+ List<SimpleRecord> result1 =
+ resultDf.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Assert.assertEquals("Records should match", expectedRecords.subList(2, 3), result1);
+ Assert.assertEquals("Unprocessed count should match record count", 1, resultDf.count());
}
@Test
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 23cd8524b3..ddeec9c494 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -271,6 +271,11 @@ public class SparkScanBuilder
return false;
}
+ if (readConf.startSnapshotId() != null) {
+ LOG.info("Skipping aggregate pushdown: incremental scan is not supported");
+ return false;
+ }
+
// If group by expression is the same as the partition, the statistics information can still
// be used to calculate min/max/count, will enable aggregate push down in next phase.
// TODO: enable aggregate push down for partition col group by expression
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 44400b5ad4..a14e7b500e 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -297,17 +297,17 @@ public class TestDataSourceOptions {
Assert.assertEquals("Records should match", expectedRecords.subList(1, 4), result);
// test (2nd snapshot, 3rd snapshot] incremental scan.
- List<SimpleRecord> result1 =
+ Dataset<Row> resultDf =
spark
.read()
.format("iceberg")
.option("start-snapshot-id", snapshotIds.get(2).toString())
.option("end-snapshot-id", snapshotIds.get(1).toString())
- .load(tableLocation)
- .orderBy("id")
- .as(Encoders.bean(SimpleRecord.class))
- .collectAsList();
+ .load(tableLocation);
+ List<SimpleRecord> result1 =
+ resultDf.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Assert.assertEquals("Records should match", expectedRecords.subList(2, 3), result1);
+ Assert.assertEquals("Unprocessed count should match record count", 1, resultDf.count());
}
@Test