You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/12/01 07:03:04 UTC
(incubator-paimon) 30/46: [flink] Limit pushdown to scan for Flink source (#2420)
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit e4612476c0c2a836cc80585214f99874c815c832
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Wed Nov 29 20:49:17 2023 +0800
[flink] Limit pushdown to scan for Flink source (#2420)
---
.../org/apache/paimon/table/source/DataSplit.java | 15 +--------------
.../paimon/table/source/InnerTableScanImpl.java | 21 ++++++++++-----------
.../org/apache/paimon/table/source/RawFile.java | 2 --
.../paimon/flink/source/FlinkSourceBuilder.java | 7 ++++++-
.../flink/UnawareBucketAppendOnlyTableITCase.java | 7 +++++++
5 files changed, 24 insertions(+), 28 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 63110c0bd..047841837 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -42,7 +42,7 @@ import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Input splits. Needed by most batch computation engines. */
public class DataSplit implements Split {
- private static final long serialVersionUID = 4L;
+ private static final long serialVersionUID = 5L;
private long snapshotId = 0;
private boolean isStreaming = false;
@@ -207,19 +207,6 @@ public class DataSplit implements Split {
return new Builder();
}
- public static Builder builder(DataSplit split) {
- Builder builder = builder();
- builder.withSnapshot(split.snapshotId);
- builder.withPartition(split.partition);
- builder.withBucket(split.bucket);
- builder.withBeforeFiles(split.beforeFiles);
- builder.withDataFiles(split.dataFiles);
- builder.isStreaming(split.isStreaming);
- builder.rawFiles(split.rawFiles);
-
- return builder;
- }
-
/** Builder for {@link DataSplit}. */
public static class Builder {
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
index 6dbc6ed32..a30d82f8c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
@@ -23,6 +23,7 @@ import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
+import org.apache.paimon.table.source.snapshot.StartingScanner.ScannedResult;
import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
@@ -88,20 +89,18 @@ public class InnerTableScanImpl extends AbstractInnerTableScan {
}
private StartingScanner.Result applyPushDownLimit(StartingScanner.Result result) {
- if (pushDownLimit != null && result instanceof StartingScanner.ScannedResult) {
+ if (pushDownLimit != null && result instanceof ScannedResult) {
long scannedRowCount = 0;
- SnapshotReader.Plan plan = ((StartingScanner.ScannedResult) result).plan();
+ SnapshotReader.Plan plan = ((ScannedResult) result).plan();
List<DataSplit> splits = plan.dataSplits();
- List<DataSplit> limitedSplits = new ArrayList<>();
- for (int i = 0; i < splits.size(); i++) {
+ List<Split> limitedSplits = new ArrayList<>();
+ for (DataSplit dataSplit : splits) {
+ long splitRowCount = getRowCountForSplit(dataSplit);
+ limitedSplits.add(dataSplit);
+ scannedRowCount += splitRowCount;
if (scannedRowCount >= pushDownLimit) {
break;
}
-
- DataSplit split = splits.get(i);
- long splitRowCount = getRowCountForSplit(split);
- limitedSplits.add(split);
- scannedRowCount += splitRowCount;
}
SnapshotReader.Plan newPlan =
@@ -120,10 +119,10 @@ public class InnerTableScanImpl extends AbstractInnerTableScan {
@Override
public List<Split> splits() {
- return (List) limitedSplits;
+ return limitedSplits;
}
};
- return new StartingScanner.ScannedResult(newPlan);
+ return new ScannedResult(newPlan);
} else {
return result;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java b/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java
index 177b57bfa..5f1a41ffa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java
@@ -34,8 +34,6 @@ import java.util.Objects;
@Public
public class RawFile {
- private static final long serialVersionUID = 2L;
-
private final String path;
private final long offset;
private final long length;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 215d2b6c6..d8878f360 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -145,7 +145,12 @@ public class FlinkSourceBuilder {
}
private ReadBuilder createReadBuilder() {
- return table.newReadBuilder().withProjection(projectedFields).withFilter(predicate);
+ ReadBuilder readBuilder =
+ table.newReadBuilder().withProjection(projectedFields).withFilter(predicate);
+ if (limit != null) {
+ readBuilder.withLimit(limit.intValue());
+ }
+ return readBuilder;
}
private DataStream<RowData> buildStaticFileSource() {
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index 51bbe0ffc..a99997a00 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -299,6 +299,13 @@ public class UnawareBucketAppendOnlyTableITCase extends CatalogITCaseBase {
});
}
+ @Test
+ public void testLimit() {
+ sql("INSERT INTO append_table VALUES (1, 'AAA')");
+ sql("INSERT INTO append_table VALUES (2, 'BBB')");
+ assertThat(sql("SELECT * FROM append_table LIMIT 1")).hasSize(1);
+ }
+
@Override
protected List<String> ddl() {
return Arrays.asList(