You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@paimon.apache.org by "YannByron (via GitHub)" <gi...@apache.org> on 2023/11/22 07:33:13 UTC

[PR] [core][spark] Supports to push down limit [incubator-paimon]

YannByron opened a new pull request, #2367:
URL: https://github.com/apache/incubator-paimon/pull/2367

   <!-- Please specify the module before the PR name: [core] ... or [flink] ... -->
   
   ### Purpose
   Support to push down limit to accelerate query.
   
   <!-- Linking this pull request to the issue -->
   Linked issue: close #xxx
   
   <!-- What is the purpose of the change -->
   
   ### Tests
   
   <!-- List UT and IT cases to verify this change -->
   
   ### API and Format
   
   <!-- Does this change affect API or storage format -->
   
   ### Documentation
   
   <!-- Does this change introduce a new feature -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core][spark] Supports to push down limit [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on PR #2367:
URL: https://github.com/apache/incubator-paimon/pull/2367#issuecomment-1831146312

   Another solution: we can make this limit pushdown generic. We can do this limit in `InnerTableScanImpl`, and we can just check `Split.convertToRawFiles`, check the rowcount, because rawFiles don't need to be merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core][spark] Supports to push down limit [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #2367:
URL: https://github.com/apache/incubator-paimon/pull/2367#discussion_r1401804685


##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkScanBuilder.scala:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
+import org.apache.paimon.table.{AppendOnlyFileStoreTable, Table}
+
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownLimit, SupportsPushDownRequiredColumns}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+class SparkScanBuilder(table: Table)
+  extends ScanBuilder
+  with SupportsPushDownFilters
+  with SupportsPushDownRequiredColumns
+  with SupportsPushDownLimit {
+
+  private var predicates: Option[Predicate] = None
+
+  private var pushed: Option[Array[Filter]] = None
+
+  private var projectedIndexes: Option[Array[Int]] = None
+
+  private var pushDownLimit: Option[Int] = None
+
+  override def build(): Scan = {
+    val readBuilder = table.newReadBuilder()
+
+    projectedIndexes.foreach(readBuilder.withProjection)
+    predicates.foreach(readBuilder.withFilter)
+    pushDownLimit.foreach(readBuilder.withLimit)
+
+    new SparkScan(table, readBuilder);
+  }
+
+  override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+    val converter = new SparkFilterConverter(table.rowType)
+    val predicates: Array[Predicate] = filters.flatMap {
+      filter =>
+        try {
+          Some(converter.convert(filter))
+        } catch {
+          case _: UnsupportedOperationException =>
+            None
+        }
+    }
+    if (predicates.nonEmpty) {
+      this.predicates = Some(PredicateBuilder.and(predicates: _*))
+    }
+    this.pushed = Some(filters)
+    filters
+  }
+
+  override def pushedFilters(): Array[Filter] = {
+    pushed.getOrElse(Array.empty)
+  }
+
+  override def pruneColumns(requiredSchema: StructType): Unit = {
+    val pruneFields = requiredSchema.fieldNames
+    val fieldNames = table.rowType.getFieldNames
+    val projected = pruneFields.map(field => fieldNames.indexOf(field))
+    this.projectedIndexes = Some(projected)
+  }
+
+  override def pushLimit(limit: Int): Boolean = {

Review Comment:
   Does Spark SQL will not push limit if there is filter?



##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkScanBuilder.scala:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
+import org.apache.paimon.table.{AppendOnlyFileStoreTable, Table}
+
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownLimit, SupportsPushDownRequiredColumns}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+class SparkScanBuilder(table: Table)
+  extends ScanBuilder
+  with SupportsPushDownFilters
+  with SupportsPushDownRequiredColumns
+  with SupportsPushDownLimit {
+
+  private var predicates: Option[Predicate] = None
+
+  private var pushed: Option[Array[Filter]] = None
+
+  private var projectedIndexes: Option[Array[Int]] = None
+
+  private var pushDownLimit: Option[Int] = None
+
+  override def build(): Scan = {
+    val readBuilder = table.newReadBuilder()
+
+    projectedIndexes.foreach(readBuilder.withProjection)
+    predicates.foreach(readBuilder.withFilter)
+    pushDownLimit.foreach(readBuilder.withLimit)
+
+    new SparkScan(table, readBuilder);
+  }
+
+  override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+    val converter = new SparkFilterConverter(table.rowType)
+    val predicates: Array[Predicate] = filters.flatMap {
+      filter =>
+        try {
+          Some(converter.convert(filter))
+        } catch {
+          case _: UnsupportedOperationException =>
+            None
+        }
+    }
+    if (predicates.nonEmpty) {
+      this.predicates = Some(PredicateBuilder.and(predicates: _*))
+    }
+    this.pushed = Some(filters)
+    filters
+  }
+
+  override def pushedFilters(): Array[Filter] = {
+    pushed.getOrElse(Array.empty)
+  }
+
+  override def pruneColumns(requiredSchema: StructType): Unit = {
+    val pruneFields = requiredSchema.fieldNames
+    val fieldNames = table.rowType.getFieldNames
+    val projected = pruneFields.map(field => fieldNames.indexOf(field))
+    this.projectedIndexes = Some(projected)
+  }
+
+  override def pushLimit(limit: Int): Boolean = {
+    if (table.isInstanceOf[AppendOnlyFileStoreTable]) {

Review Comment:
   No need to `if append table`? Paimon-core scan should take care about this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core][spark] Supports to push down limit [incubator-paimon]

Posted by "YannByron (via GitHub)" <gi...@apache.org>.
YannByron closed pull request #2367: [core][spark] Supports to push down limit
URL: https://github.com/apache/incubator-paimon/pull/2367


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core][spark] Supports to push down limit [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #2367:
URL: https://github.com/apache/incubator-paimon/pull/2367#discussion_r1408664798


##########
paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java:
##########
@@ -199,6 +202,11 @@ public FileStoreScan withMetrics(ScanMetrics metrics) {
         return this;
     }
 
+    @Override
+    public FileStoreScan withLimit(int limit) {

Review Comment:
   Can limit implementation just in `AppendOnlyFileStoreScan`? It can override plan method to do post-limit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core][spark] Supports to push down limit [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #2367:
URL: https://github.com/apache/incubator-paimon/pull/2367#discussion_r1409055612


##########
paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/SparkScanBuilder.scala:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate, PredicateBuilder}
+import org.apache.paimon.table.Table
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.mutable
+
+class SparkScanBuilder(table: Table)

Review Comment:
   Can we have a `SparkBaseScanBuilder` to reuse code?



##########
paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java:
##########
@@ -76,12 +80,18 @@ public long schemaId() {
         return schemaId;
     }
 
+    /** row count of the file. */
+    public long rowCount() {
+        return rowCount;
+    }
+
     public void serialize(DataOutputView out) throws IOException {
         out.writeUTF(path);
         out.writeLong(offset);
         out.writeLong(length);
         out.writeUTF(format);
         out.writeLong(schemaId);
+        out.writeLong(rowCount);

Review Comment:
   Increment `DataSplit.serialVersionUID` too.



##########
paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java:
##########
@@ -68,9 +81,96 @@ public TableScan.Plan plan() {
         if (hasNext) {
             hasNext = false;
             StartingScanner.Result result = startingScanner.scan(snapshotReader);
-            return DataFilePlan.fromResult(result);
+            StartingScanner.Result limitedResult = applyPushDownLimit(result);
+            return DataFilePlan.fromResult(limitedResult);
         } else {
             throw new EndOfScanException();
         }
     }
+
+    private StartingScanner.Result applyPushDownLimit(StartingScanner.Result result) {
+        if (pushDownLimit != null && result instanceof StartingScanner.ScannedResult) {
+            long scannedRowCount = 0;
+            SnapshotReader.Plan plan = ((StartingScanner.ScannedResult) result).plan();
+            List<DataSplit> splits = plan.dataSplits();
+            List<DataSplit> limitedSplits = new ArrayList<>();
+            for (int i = 0; i < splits.size(); i++) {
+                if (scannedRowCount >= pushDownLimit) {
+                    break;
+                }
+
+                DataSplit split = splits.get(i);
+                long splitRowCount = getRowCountForSplit(split);
+                if (scannedRowCount + splitRowCount <= pushDownLimit) {
+                    limitedSplits.add(split);
+                    scannedRowCount += splitRowCount;
+                } else {
+                    DataSplit newSplit = composeDataSplit(split, pushDownLimit - scannedRowCount);

Review Comment:
   I think we don't need to introduce `composeDataSplit`.
   
   The reason we introduce limit pushdown is to reduce split number. We don't need to reduce files in split.



##########
paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/SparkScanBuilder.scala:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate, PredicateBuilder}
+import org.apache.paimon.table.Table
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.mutable
+
+class SparkScanBuilder(table: Table)

Review Comment:
   You need to add class for spark-3.2 too? And add some itcase for spark-3.2 too (For ITCase, we also can have some abstraction to reduce code replication).



##########
paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java:
##########
@@ -68,9 +81,96 @@ public TableScan.Plan plan() {
         if (hasNext) {
             hasNext = false;
             StartingScanner.Result result = startingScanner.scan(snapshotReader);
-            return DataFilePlan.fromResult(result);
+            StartingScanner.Result limitedResult = applyPushDownLimit(result);
+            return DataFilePlan.fromResult(limitedResult);
         } else {
             throw new EndOfScanException();
         }
     }
+
+    private StartingScanner.Result applyPushDownLimit(StartingScanner.Result result) {
+        if (pushDownLimit != null && result instanceof StartingScanner.ScannedResult) {
+            long scannedRowCount = 0;
+            SnapshotReader.Plan plan = ((StartingScanner.ScannedResult) result).plan();
+            List<DataSplit> splits = plan.dataSplits();
+            List<DataSplit> limitedSplits = new ArrayList<>();
+            for (int i = 0; i < splits.size(); i++) {
+                if (scannedRowCount >= pushDownLimit) {
+                    break;
+                }
+
+                DataSplit split = splits.get(i);
+                long splitRowCount = getRowCountForSplit(split);
+                if (scannedRowCount + splitRowCount <= pushDownLimit) {
+                    limitedSplits.add(split);
+                    scannedRowCount += splitRowCount;
+                } else {
+                    DataSplit newSplit = composeDataSplit(split, pushDownLimit - scannedRowCount);
+                    limitedSplits.add(newSplit);
+                    scannedRowCount += getRowCountForSplit(newSplit);
+                }
+            }
+
+            SnapshotReader.Plan newPlan =
+                    new SnapshotReader.Plan() {
+                        @Nullable
+                        @Override
+                        public Long watermark() {
+                            return plan.watermark();
+                        }
+
+                        @Nullable
+                        @Override
+                        public Long snapshotId() {
+                            return plan.snapshotId();
+                        }
+
+                        @Override
+                        public List<Split> splits() {
+                            return (List) limitedSplits;
+                        }
+                    };
+            return new StartingScanner.ScannedResult(newPlan);
+        } else {
+            return result;
+        }
+    }
+
+    /**
+     * 0 represents that we can't compute the row count of this split, 'cause this split needs to be
+     * merged.
+     */
+    private long getRowCountForSplit(DataSplit split) {
+        if (split.convertToRawFiles().isPresent()) {
+            return split.convertToRawFiles().get().stream()
+                    .map(RawFile::rowCount)
+                    .reduce(Long::sum)
+                    .orElse(0L);
+        } else {
+            return 0L;
+        }
+    }
+
+    private DataSplit composeDataSplit(DataSplit split, long rowCountRequired) {
+        List<DataFileMeta> dataFiles = new ArrayList<>();
+        List<RawFile> rawFiles = new ArrayList<>();
+        long scannedRowCount = 0;
+
+        List<DataFileMeta> originalDataFiles = split.dataFiles();
+        List<RawFile> originalRawFiles = split.convertToRawFiles().get();

Review Comment:
   Just get? Exception when it is empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core][spark] Supports to push down limit [incubator-paimon]

Posted by "YannByron (via GitHub)" <gi...@apache.org>.
YannByron commented on PR #2367:
URL: https://github.com/apache/incubator-paimon/pull/2367#issuecomment-1827350744

   These failed UTs are not related to this pr.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core][spark] Supports to push down limit [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #2367:
URL: https://github.com/apache/incubator-paimon/pull/2367#discussion_r1408664798


##########
paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java:
##########
@@ -199,6 +202,11 @@ public FileStoreScan withMetrics(ScanMetrics metrics) {
         return this;
     }
 
+    @Override
+    public FileStoreScan withLimit(int limit) {

Review Comment:
   Can limit implementation just in `AppendOnlyFileStoreScan`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core][spark] Supports to push down limit [incubator-paimon]

Posted by "YannByron (via GitHub)" <gi...@apache.org>.
YannByron commented on PR #2367:
URL: https://github.com/apache/incubator-paimon/pull/2367#issuecomment-1829498105

   https://github.com/apache/incubator-paimon/issues/2404


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core][spark] Supports to push down limit [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on PR #2367:
URL: https://github.com/apache/incubator-paimon/pull/2367#issuecomment-1831146306

   Another solution: we can make this limit pushdown generic. We can do this limit in `InnerTableScanImpl`, and we can just check `Split.convertToRawFiles`, check the rowcount, because rawFiles don't need to be merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core][spark] Supports to push down limit [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #2367:
URL: https://github.com/apache/incubator-paimon/pull/2367#discussion_r1401811137


##########
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSQLPerformanceTest.scala:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.{PaimonSparkTestBase, SparkInputPartition, SparkTable}
+import org.apache.paimon.table.source.DataSplit
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.connector.read.{ScanBuilder, SupportsPushDownLimit}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.junit.jupiter.api.Assertions
+
+import scala.collection.JavaConverters._
+
+class PaimonSQLPerformanceTest extends PaimonSparkTestBase {

Review Comment:
   What this class for? It looks like we can not find performance result from these tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core][spark] Supports to push down limit [incubator-paimon]

Posted by "YannByron (via GitHub)" <gi...@apache.org>.
YannByron commented on code in PR #2367:
URL: https://github.com/apache/incubator-paimon/pull/2367#discussion_r1404113126


##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkScanBuilder.scala:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
+import org.apache.paimon.table.{AppendOnlyFileStoreTable, Table}
+
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownLimit, SupportsPushDownRequiredColumns}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+class SparkScanBuilder(table: Table)
+  extends ScanBuilder
+  with SupportsPushDownFilters
+  with SupportsPushDownRequiredColumns
+  with SupportsPushDownLimit {
+
+  private var predicates: Option[Predicate] = None
+
+  private var pushed: Option[Array[Filter]] = None
+
+  private var projectedIndexes: Option[Array[Int]] = None
+
+  private var pushDownLimit: Option[Int] = None
+
+  override def build(): Scan = {
+    val readBuilder = table.newReadBuilder()
+
+    projectedIndexes.foreach(readBuilder.withProjection)
+    predicates.foreach(readBuilder.withFilter)
+    pushDownLimit.foreach(readBuilder.withLimit)
+
+    new SparkScan(table, readBuilder);
+  }
+
+  override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+    val converter = new SparkFilterConverter(table.rowType)
+    val predicates: Array[Predicate] = filters.flatMap {
+      filter =>
+        try {
+          Some(converter.convert(filter))
+        } catch {
+          case _: UnsupportedOperationException =>
+            None
+        }
+    }
+    if (predicates.nonEmpty) {
+      this.predicates = Some(PredicateBuilder.and(predicates: _*))
+    }
+    this.pushed = Some(filters)
+    filters
+  }
+
+  override def pushedFilters(): Array[Filter] = {
+    pushed.getOrElse(Array.empty)
+  }
+
+  override def pruneColumns(requiredSchema: StructType): Unit = {
+    val pruneFields = requiredSchema.fieldNames
+    val fieldNames = table.rowType.getFieldNames
+    val projected = pruneFields.map(field => fieldNames.indexOf(field))
+    this.projectedIndexes = Some(projected)
+  }
+
+  override def pushLimit(limit: Int): Boolean = {
+    if (table.isInstanceOf[AppendOnlyFileStoreTable]) {

Review Comment:
   As a temporary solution, it's ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core][spark] Supports to push down limit [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi commented on code in PR #2367:
URL: https://github.com/apache/incubator-paimon/pull/2367#discussion_r1403920073


##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkScanBuilder.scala:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
+import org.apache.paimon.table.{AppendOnlyFileStoreTable, Table}
+
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownLimit, SupportsPushDownRequiredColumns}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+class SparkScanBuilder(table: Table)
+  extends ScanBuilder
+  with SupportsPushDownFilters
+  with SupportsPushDownRequiredColumns
+  with SupportsPushDownLimit {
+
+  private var predicates: Option[Predicate] = None
+
+  private var pushed: Option[Array[Filter]] = None
+
+  private var projectedIndexes: Option[Array[Int]] = None
+
+  private var pushDownLimit: Option[Int] = None
+
+  override def build(): Scan = {
+    val readBuilder = table.newReadBuilder()
+
+    projectedIndexes.foreach(readBuilder.withProjection)
+    predicates.foreach(readBuilder.withFilter)
+    pushDownLimit.foreach(readBuilder.withLimit)
+
+    new SparkScan(table, readBuilder);
+  }
+
+  override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+    val converter = new SparkFilterConverter(table.rowType)
+    val predicates: Array[Predicate] = filters.flatMap {
+      filter =>
+        try {
+          Some(converter.convert(filter))
+        } catch {
+          case _: UnsupportedOperationException =>
+            None
+        }
+    }
+    if (predicates.nonEmpty) {
+      this.predicates = Some(PredicateBuilder.and(predicates: _*))
+    }
+    this.pushed = Some(filters)
+    filters
+  }
+
+  override def pushedFilters(): Array[Filter] = {
+    pushed.getOrElse(Array.empty)
+  }
+
+  override def pruneColumns(requiredSchema: StructType): Unit = {
+    val pruneFields = requiredSchema.fieldNames
+    val fieldNames = table.rowType.getFieldNames
+    val projected = pruneFields.map(field => fieldNames.indexOf(field))
+    this.projectedIndexes = Some(projected)
+  }
+
+  override def pushLimit(limit: Int): Boolean = {
+    if (table.isInstanceOf[AppendOnlyFileStoreTable]) {

Review Comment:
   We can just return false here? Best effort pushdown?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core][spark] Supports to push down limit [incubator-paimon]

Posted by "YannByron (via GitHub)" <gi...@apache.org>.
YannByron commented on code in PR #2367:
URL: https://github.com/apache/incubator-paimon/pull/2367#discussion_r1409110877


##########
paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/SparkScanBuilder.scala:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate, PredicateBuilder}
+import org.apache.paimon.table.Table
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.mutable
+
+class SparkScanBuilder(table: Table)

Review Comment:
   For UT, just copy codes for now. Let refine it in another pr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core][spark] Supports to push down limit [incubator-paimon]

Posted by "YannByron (via GitHub)" <gi...@apache.org>.
YannByron commented on code in PR #2367:
URL: https://github.com/apache/incubator-paimon/pull/2367#discussion_r1401842151


##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkScanBuilder.scala:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
+import org.apache.paimon.table.{AppendOnlyFileStoreTable, Table}
+
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownLimit, SupportsPushDownRequiredColumns}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+class SparkScanBuilder(table: Table)
+  extends ScanBuilder
+  with SupportsPushDownFilters
+  with SupportsPushDownRequiredColumns
+  with SupportsPushDownLimit {
+
+  private var predicates: Option[Predicate] = None
+
+  private var pushed: Option[Array[Filter]] = None
+
+  private var projectedIndexes: Option[Array[Int]] = None
+
+  private var pushDownLimit: Option[Int] = None
+
+  override def build(): Scan = {
+    val readBuilder = table.newReadBuilder()
+
+    projectedIndexes.foreach(readBuilder.withProjection)
+    predicates.foreach(readBuilder.withFilter)
+    pushDownLimit.foreach(readBuilder.withLimit)
+
+    new SparkScan(table, readBuilder);
+  }
+
+  override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+    val converter = new SparkFilterConverter(table.rowType)
+    val predicates: Array[Predicate] = filters.flatMap {
+      filter =>
+        try {
+          Some(converter.convert(filter))
+        } catch {
+          case _: UnsupportedOperationException =>
+            None
+        }
+    }
+    if (predicates.nonEmpty) {
+      this.predicates = Some(PredicateBuilder.and(predicates: _*))
+    }
+    this.pushed = Some(filters)
+    filters
+  }
+
+  override def pushedFilters(): Array[Filter] = {
+    pushed.getOrElse(Array.empty)
+  }
+
+  override def pruneColumns(requiredSchema: StructType): Unit = {
+    val pruneFields = requiredSchema.fieldNames
+    val fieldNames = table.rowType.getFieldNames
+    val projected = pruneFields.map(field => fieldNames.indexOf(field))
+    this.projectedIndexes = Some(projected)
+  }
+
+  override def pushLimit(limit: Int): Boolean = {
+    if (table.isInstanceOf[AppendOnlyFileStoreTable]) {

Review Comment:
   Here, a `boolean` value is needed, which is used in SparkSQL. Or, paimon-core needs to provide an api to be called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core][spark] Supports to push down limit [incubator-paimon]

Posted by "JingsongLi (via GitHub)" <gi...@apache.org>.
JingsongLi merged PR #2367:
URL: https://github.com/apache/incubator-paimon/pull/2367


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core][spark] Supports to push down limit [incubator-paimon]

Posted by "YannByron (via GitHub)" <gi...@apache.org>.
YannByron commented on code in PR #2367:
URL: https://github.com/apache/incubator-paimon/pull/2367#discussion_r1401846360


##########
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonSQLPerformanceTest.scala:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.{PaimonSparkTestBase, SparkInputPartition, SparkTable}
+import org.apache.paimon.table.source.DataSplit
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.connector.read.{ScanBuilder, SupportsPushDownLimit}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.junit.jupiter.api.Assertions
+
+import scala.collection.JavaConverters._
+
+class PaimonSQLPerformanceTest extends PaimonSparkTestBase {

Review Comment:
   Maybe it's not a good class name. It aims to check whether some operations can work, like this `SupportsPushDownLimit` which can reduce the number of splits scanned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [core][spark] Supports to push down limit [incubator-paimon]

Posted by "YannByron (via GitHub)" <gi...@apache.org>.
YannByron commented on code in PR #2367:
URL: https://github.com/apache/incubator-paimon/pull/2367#discussion_r1401842923


##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkScanBuilder.scala:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
+import org.apache.paimon.table.{AppendOnlyFileStoreTable, Table}
+
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownLimit, SupportsPushDownRequiredColumns}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+class SparkScanBuilder(table: Table)
+  extends ScanBuilder
+  with SupportsPushDownFilters
+  with SupportsPushDownRequiredColumns
+  with SupportsPushDownLimit {
+
+  private var predicates: Option[Predicate] = None
+
+  private var pushed: Option[Array[Filter]] = None
+
+  private var projectedIndexes: Option[Array[Int]] = None
+
+  private var pushDownLimit: Option[Int] = None
+
+  override def build(): Scan = {
+    val readBuilder = table.newReadBuilder()
+
+    projectedIndexes.foreach(readBuilder.withProjection)
+    predicates.foreach(readBuilder.withFilter)
+    pushDownLimit.foreach(readBuilder.withLimit)
+
+    new SparkScan(table, readBuilder);
+  }
+
+  override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+    val converter = new SparkFilterConverter(table.rowType)
+    val predicates: Array[Predicate] = filters.flatMap {
+      filter =>
+        try {
+          Some(converter.convert(filter))
+        } catch {
+          case _: UnsupportedOperationException =>
+            None
+        }
+    }
+    if (predicates.nonEmpty) {
+      this.predicates = Some(PredicateBuilder.and(predicates: _*))
+    }
+    this.pushed = Some(filters)
+    filters
+  }
+
+  override def pushedFilters(): Array[Filter] = {
+    pushed.getOrElse(Array.empty)
+  }
+
+  override def pruneColumns(requiredSchema: StructType): Unit = {
+    val pruneFields = requiredSchema.fieldNames
+    val fieldNames = table.rowType.getFieldNames
+    val projected = pruneFields.map(field => fieldNames.indexOf(field))
+    this.projectedIndexes = Some(projected)
+  }
+
+  override def pushLimit(limit: Int): Boolean = {

Review Comment:
   yes. SparkSQL can guarantee this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@paimon.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org