You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/05/17 15:44:58 UTC

[iceberg] branch master updated: Spark: Disable aggregate pushdown for incremental scan (#7626)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d7ef2e9dd Spark: Disable aggregate pushdown for incremental scan (#7626)
9d7ef2e9dd is described below

commit 9d7ef2e9dd27375c2b8bd7a143e4f623118185f8
Author: Huaxin Gao <hu...@apple.com>
AuthorDate: Wed May 17 08:44:51 2023 -0700

    Spark: Disable aggregate pushdown for incremental scan (#7626)
---
 .../java/org/apache/iceberg/spark/source/SparkScanBuilder.java |  5 +++++
 .../org/apache/iceberg/spark/source/TestDataSourceOptions.java | 10 +++++-----
 .../java/org/apache/iceberg/spark/source/SparkScanBuilder.java |  5 +++++
 .../org/apache/iceberg/spark/source/TestDataSourceOptions.java | 10 +++++-----
 4 files changed, 20 insertions(+), 10 deletions(-)

diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 23cd8524b3..ddeec9c494 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -271,6 +271,11 @@ public class SparkScanBuilder
       return false;
     }
 
+    if (readConf.startSnapshotId() != null) {
+      LOG.info("Skipping aggregate pushdown: incremental scan is not supported");
+      return false;
+    }
+
     // If group by expression is the same as the partition, the statistics information can still
     // be used to calculate min/max/count, will enable aggregate push down in next phase.
     // TODO: enable aggregate push down for partition col group by expression
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 60dd716c63..9f4eab5bb9 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -300,17 +300,17 @@ public class TestDataSourceOptions {
     Assert.assertEquals("Records should match", expectedRecords.subList(1, 4), result);
 
     // test (2nd snapshot, 3rd snapshot] incremental scan.
-    List<SimpleRecord> result1 =
+    Dataset<Row> resultDf =
         spark
             .read()
             .format("iceberg")
             .option("start-snapshot-id", snapshotIds.get(2).toString())
             .option("end-snapshot-id", snapshotIds.get(1).toString())
-            .load(tableLocation)
-            .orderBy("id")
-            .as(Encoders.bean(SimpleRecord.class))
-            .collectAsList();
+            .load(tableLocation);
+    List<SimpleRecord> result1 =
+        resultDf.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
     Assert.assertEquals("Records should match", expectedRecords.subList(2, 3), result1);
+    Assert.assertEquals("Unprocessed count should match record count", 1, resultDf.count());
   }
 
   @Test
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 23cd8524b3..ddeec9c494 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -271,6 +271,11 @@ public class SparkScanBuilder
       return false;
     }
 
+    if (readConf.startSnapshotId() != null) {
+      LOG.info("Skipping aggregate pushdown: incremental scan is not supported");
+      return false;
+    }
+
     // If group by expression is the same as the partition, the statistics information can still
     // be used to calculate min/max/count, will enable aggregate push down in next phase.
     // TODO: enable aggregate push down for partition col group by expression
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 44400b5ad4..a14e7b500e 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -297,17 +297,17 @@ public class TestDataSourceOptions {
     Assert.assertEquals("Records should match", expectedRecords.subList(1, 4), result);
 
     // test (2nd snapshot, 3rd snapshot] incremental scan.
-    List<SimpleRecord> result1 =
+    Dataset<Row> resultDf =
         spark
             .read()
             .format("iceberg")
             .option("start-snapshot-id", snapshotIds.get(2).toString())
             .option("end-snapshot-id", snapshotIds.get(1).toString())
-            .load(tableLocation)
-            .orderBy("id")
-            .as(Encoders.bean(SimpleRecord.class))
-            .collectAsList();
+            .load(tableLocation);
+    List<SimpleRecord> result1 =
+        resultDf.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
     Assert.assertEquals("Records should match", expectedRecords.subList(2, 3), result1);
+    Assert.assertEquals("Unprocessed count should match record count", 1, resultDf.count());
   }
 
   @Test