You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/12/01 07:03:04 UTC

(incubator-paimon) 30/46: [flink] Limit pushdown to scan for Flink source (#2420)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit e4612476c0c2a836cc80585214f99874c815c832
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Wed Nov 29 20:49:17 2023 +0800

    [flink] Limit pushdown to scan for Flink source (#2420)
---
 .../org/apache/paimon/table/source/DataSplit.java   | 15 +--------------
 .../paimon/table/source/InnerTableScanImpl.java     | 21 ++++++++++-----------
 .../org/apache/paimon/table/source/RawFile.java     |  2 --
 .../paimon/flink/source/FlinkSourceBuilder.java     |  7 ++++++-
 .../flink/UnawareBucketAppendOnlyTableITCase.java   |  7 +++++++
 5 files changed, 24 insertions(+), 28 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 63110c0bd..047841837 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -42,7 +42,7 @@ import static org.apache.paimon.utils.Preconditions.checkArgument;
 /** Input splits. Needed by most batch computation engines. */
 public class DataSplit implements Split {
 
-    private static final long serialVersionUID = 4L;
+    private static final long serialVersionUID = 5L;
 
     private long snapshotId = 0;
     private boolean isStreaming = false;
@@ -207,19 +207,6 @@ public class DataSplit implements Split {
         return new Builder();
     }
 
-    public static Builder builder(DataSplit split) {
-        Builder builder = builder();
-        builder.withSnapshot(split.snapshotId);
-        builder.withPartition(split.partition);
-        builder.withBucket(split.bucket);
-        builder.withBeforeFiles(split.beforeFiles);
-        builder.withDataFiles(split.dataFiles);
-        builder.isStreaming(split.isStreaming);
-        builder.rawFiles(split.rawFiles);
-
-        return builder;
-    }
-
     /** Builder for {@link DataSplit}. */
     public static class Builder {
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
index 6dbc6ed32..a30d82f8c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
@@ -23,6 +23,7 @@ import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
+import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
 import org.apache.paimon.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
@@ -88,20 +89,18 @@ public class InnerTableScanImpl extends AbstractInnerTableScan {
     }
 
     private StartingScanner.Result applyPushDownLimit(StartingScanner.Result result) {
-        if (pushDownLimit != null && result instanceof StartingScanner.ScannedResult) {
+        if (pushDownLimit != null && result instanceof ScannedResult) {
             long scannedRowCount = 0;
-            SnapshotReader.Plan plan = ((StartingScanner.ScannedResult) result).plan();
+            SnapshotReader.Plan plan = ((ScannedResult) result).plan();
             List<DataSplit> splits = plan.dataSplits();
-            List<DataSplit> limitedSplits = new ArrayList<>();
-            for (int i = 0; i < splits.size(); i++) {
+            List<Split> limitedSplits = new ArrayList<>();
+            for (DataSplit dataSplit : splits) {
+                long splitRowCount = getRowCountForSplit(dataSplit);
+                limitedSplits.add(dataSplit);
+                scannedRowCount += splitRowCount;
                 if (scannedRowCount >= pushDownLimit) {
                     break;
                 }
-
-                DataSplit split = splits.get(i);
-                long splitRowCount = getRowCountForSplit(split);
-                limitedSplits.add(split);
-                scannedRowCount += splitRowCount;
             }
 
             SnapshotReader.Plan newPlan =
@@ -120,10 +119,10 @@ public class InnerTableScanImpl extends AbstractInnerTableScan {
 
                         @Override
                         public List<Split> splits() {
-                            return (List) limitedSplits;
+                            return limitedSplits;
                         }
                     };
-            return new StartingScanner.ScannedResult(newPlan);
+            return new ScannedResult(newPlan);
         } else {
             return result;
         }
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java b/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java
index 177b57bfa..5f1a41ffa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java
@@ -34,8 +34,6 @@ import java.util.Objects;
 @Public
 public class RawFile {
 
-    private static final long serialVersionUID = 2L;
-
     private final String path;
     private final long offset;
     private final long length;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 215d2b6c6..d8878f360 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -145,7 +145,12 @@ public class FlinkSourceBuilder {
     }
 
     private ReadBuilder createReadBuilder() {
-        return table.newReadBuilder().withProjection(projectedFields).withFilter(predicate);
+        ReadBuilder readBuilder =
+                table.newReadBuilder().withProjection(projectedFields).withFilter(predicate);
+        if (limit != null) {
+            readBuilder.withLimit(limit.intValue());
+        }
+        return readBuilder;
     }
 
     private DataStream<RowData> buildStaticFileSource() {
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index 51bbe0ffc..a99997a00 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -299,6 +299,13 @@ public class UnawareBucketAppendOnlyTableITCase extends CatalogITCaseBase {
                 });
     }
 
+    @Test
+    public void testLimit() {
+        sql("INSERT INTO append_table VALUES (1, 'AAA')");
+        sql("INSERT INTO append_table VALUES (2, 'BBB')");
+        assertThat(sql("SELECT * FROM append_table LIMIT 1")).hasSize(1);
+    }
+
     @Override
     protected List<String> ddl() {
         return Arrays.asList(