You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/09/26 19:50:39 UTC

[GitHub] [spark] aokolnychyi opened a new pull request, #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

aokolnychyi opened a new pull request, #36304:
URL: https://github.com/apache/spark/pull/36304

   
   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   This PR adds runtime group filtering for group-based row-level operations.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   These changes are needed to avoid rewriting unnecessary groups as the data skipping during job planning is limited and can still report false positive groups to rewrite.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   This PR extends `RowLevelOperation` but the changes are backward compatible. 
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   This PR comes with tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r989616739


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuite.scala:
##########
@@ -626,4 +633,142 @@ abstract class DeleteFromTableSuiteBase
   }
 }
 
-class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase
+class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase {

Review Comment:
   Could you create `GroupBasedDeleteFromTableSuite.scala`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] closed pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands
URL: https://github.com/apache/spark/pull/36304


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #36304:
URL: https://github.com/apache/spark/pull/36304#issuecomment-1262447072

   I agree, @rdblue.
   
   Given that versioned data sources can handle it internally (as the same `Table` instance will be used in both scans), I think that's good enough for now. We can implement option 2 or something similar later if needed. We shouldn't block runtime filtering for row-level operations on that. I'll update the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r990419267


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuite.scala:
##########
@@ -626,4 +633,142 @@ abstract class DeleteFromTableSuiteBase
   }
 }
 
-class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase
+class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase {

Review Comment:
   @dongjoon-hyun, do you mean move this class into its own file? I can surely do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r984978969


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -412,6 +412,21 @@ object SQLConf {
       .longConf
       .createWithDefault(67108864L)
 
+  val RUNTIME_ROW_LEVEL_OPERATION_GROUP_FILTER_ENABLED =
+    buildConf("spark.sql.optimizer.runtime.rowLevelOperationGroupFilter.enabled")

Review Comment:
   I went back and forth on the name. On one hand, we have dynamic partition pruning. On the other hand, we call it runtime filtering in DS V2. Ideas are welcome.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r989616936


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -412,6 +412,21 @@ object SQLConf {
       .longConf
       .createWithDefault(67108864L)
 
+  val RUNTIME_ROW_LEVEL_OPERATION_GROUP_FILTER_ENABLED =
+    buildConf("spark.sql.optimizer.runtime.rowLevelOperationGroupFilter.enabled")
+      .doc("Enables runtime group filtering for group-based row-level operations. " +
+        "Data sources that replace groups of data (e.g. files, partitions) may prune entire " +
+        "groups using provided data source filters when planning a row-level operation scan. " +
+        "However, such filtering is limited as not all expressions can be converted into data " +
+        "source filters and some expressions can only be evaluated by Spark (e.g. subqueries). " +
+        "Since rewriting groups is expensive, Spark can execute a query at runtime to find what " +
+        "records match the condition of the row-level operation. The information about matching " +
+        "records will be passed back to the row-level operation scan, allowing data sources to " +
+        "discard groups that don't have to be rewritten.")
+      .version("3.4.0")
+      .booleanConf
+      .createWithDefault(true)

Review Comment:
   I agree with starting with `true` in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r991702558


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuite.scala:
##########
@@ -626,4 +633,142 @@ abstract class DeleteFromTableSuiteBase
   }
 }
 
-class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase
+class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase {

Review Comment:
   Done. I also renamed the original file as `DeleteFromTableSuiteBase` is the only class now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r1014850793


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala:
##########
@@ -50,7 +50,8 @@ class SparkOptimizer(
   override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+
     Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
     Batch("PartitionPruning", Once,
-      PartitionPruning) :+
+      PartitionPruning,
+      RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries)) :+

Review Comment:
   This would be much cleaner but SPARK-36444 removed `OptimizeSubqueries` from that batch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r999648945


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.dynamicpruning
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation}
+
+/**
+ * A rule that assigns a subquery to filter groups in row-level operations at runtime.
+ *
+ * Data skipping during job planning for row-level operations is limited to expressions that can be
+ * converted to data source filters. Since not all expressions can be pushed down that way and
+ * rewriting groups is expensive, Spark allows data sources to filter group at runtime.
+ * If the primary scan in a group-based row-level operation supports runtime filtering, this rule
+ * will inject a subquery to find all rows that match the condition so that data sources know
+ * exactly which groups must be rewritten.
+ *
+ * Note this rule only applies to group-based row-level operations.
+ */
+case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])

Review Comment:
   why do we need to pass the rule as a parameter? Can't we call `OptimizeSubqueries` directly in this rule?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r1014850115


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.dynamicpruning
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation}
+
+/**
+ * A rule that assigns a subquery to filter groups in row-level operations at runtime.
+ *
+ * Data skipping during job planning for row-level operations is limited to expressions that can be
+ * converted to data source filters. Since not all expressions can be pushed down that way and
+ * rewriting groups is expensive, Spark allows data sources to filter group at runtime.
+ * If the primary scan in a group-based row-level operation supports runtime filtering, this rule
+ * will inject a subquery to find all rows that match the condition so that data sources know
+ * exactly which groups must be rewritten.
+ *
+ * Note this rule only applies to group-based row-level operations.
+ */
+case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])
+  extends Rule[LogicalPlan] with PredicateHelper {
+
+  import DataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+    // apply special dynamic filtering only for group-based row-level operations
+    case GroupBasedRowLevelOperation(replaceData, cond,
+        DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _))
+        if conf.runtimeRowLevelOperationGroupFilterEnabled && cond != TrueLiteral =>
+
+      // use reference equality on scan to find required scan relations
+      val newQuery = replaceData.query transformUp {
+        case r: DataSourceV2ScanRelation if r.scan eq scan =>
+          // use the original table instance that was loaded for this row-level operation
+          // in order to leverage a regular batch scan in the group filter query
+          val originalTable = r.relation.table.asRowLevelOperationTable.table
+          val relation = r.relation.copy(table = originalTable)
+          val matchingRowsPlan = buildMatchingRowsPlan(relation, cond)
+
+          val filterAttrs = scan.filterAttributes
+          val buildKeys = V2ExpressionUtils.resolveRefs[Attribute](filterAttrs, matchingRowsPlan)
+          val pruningKeys = V2ExpressionUtils.resolveRefs[Attribute](filterAttrs, r)
+          val dynamicPruningCond = buildDynamicPruningCond(matchingRowsPlan, buildKeys, pruningKeys)
+
+          Filter(dynamicPruningCond, r)
+      }
+
+      // optimize subqueries to rewrite them as joins and trigger job planning
+      replaceData.copy(query = optimizeSubqueries(newQuery))

Review Comment:
   Not really, @cloud-fan. This rule simply attaches a runtime filter to the plan that was created while rewriting the delete. We do replace the query but it is pretty much the same plan, just with an extra runtime filter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r1014850793


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala:
##########
@@ -50,7 +50,8 @@ class SparkOptimizer(
   override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+
     Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
     Batch("PartitionPruning", Once,
-      PartitionPruning) :+
+      PartitionPruning,
+      RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries)) :+

Review Comment:
   This would be much cleaner but SPARK-36444 removed `OptimizeSubqueries`. I can add a separate batch for group filtering in row-level operations. How does that sound, @cloud-fan?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r1014850793


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala:
##########
@@ -50,7 +50,8 @@ class SparkOptimizer(
   override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+
     Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
     Batch("PartitionPruning", Once,
-      PartitionPruning) :+
+      PartitionPruning,
+      RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries)) :+

Review Comment:
   This would be much cleaner but SPARK-36444 removed `OptimizeSubqueries`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r1014850975


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.dynamicpruning
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation}
+
+/**
+ * A rule that assigns a subquery to filter groups in row-level operations at runtime.
+ *
+ * Data skipping during job planning for row-level operations is limited to expressions that can be
+ * converted to data source filters. Since not all expressions can be pushed down that way and
+ * rewriting groups is expensive, Spark allows data sources to filter group at runtime.
+ * If the primary scan in a group-based row-level operation supports runtime filtering, this rule
+ * will inject a subquery to find all rows that match the condition so that data sources know
+ * exactly which groups must be rewritten.
+ *
+ * Note this rule only applies to group-based row-level operations.
+ */
+case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])

Review Comment:
   Like I said [here](https://github.com/apache/spark/pull/36304#discussion_r1014850793), we could move the new rule into a separate batch and add `OptimizeSubqueries` to it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r984979702


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -412,6 +412,21 @@ object SQLConf {
       .longConf
       .createWithDefault(67108864L)
 
+  val RUNTIME_ROW_LEVEL_OPERATION_GROUP_FILTER_ENABLED =
+    buildConf("spark.sql.optimizer.runtime.rowLevelOperationGroupFilter.enabled")

Review Comment:
   I also used the `spark.sql.optimizer.runtime` prefix like for runtime Bloom filter joins. There are other runtime-related configs that don't use this prefix so let me know the correct config namespace.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r984978969


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -412,6 +412,21 @@ object SQLConf {
       .longConf
       .createWithDefault(67108864L)
 
+  val RUNTIME_ROW_LEVEL_OPERATION_GROUP_FILTER_ENABLED =
+    buildConf("spark.sql.optimizer.runtime.rowLevelOperationGroupFilter.enabled")

Review Comment:
   I went back and forth on the name. On one hand, we have dynamic partition pruning. On the other hand, we call it runtime filtering in DS v2. Ideas are welcome.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r855452395


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RowLevelOperation.java:
##########
@@ -69,6 +70,33 @@ default String description() {
    */
   ScanBuilder newScanBuilder(CaseInsensitiveStringMap options);
 
+  /**
+   * Returns a {@link ScanBuilder} to configure a {@link Scan} for runtime filtering of groups
+   * that are affected by this row-level operation.
+   * <p>
+   * Data sources that replace groups of data (e.g. files, partitions) may exclude entire groups
+   * using provided data source filters when building the primary scan for this row-level operation.
+   * However, such data skipping is limited as not all expressions can be converted into data source
+   * filters and some can only be evaluated by Spark (e.g. subqueries). Since rewriting groups is
+   * expensive, Spark allows group-based data sources to filter groups at runtime. The runtime
+   * filtering enables data sources to narrow down the scope of rewriting to only groups that must
+   * be rewritten. If the primary scan implements {@link SupportsRuntimeFiltering}, Spark will
+   * dynamically execute a query to find which records match the condition. The information about
+   * these records will be then passed back into the primary scan, allowing data sources to discard
+   * groups that don't have to be rewritten.
+   * <p>
+   * This method allows data sources to provide a dedicated scan builder for group filtering.
+   * Scans built for runtime group filtering are not required to produce all rows in a group
+   * if any are returned. Instead, they can push filters into files (file granularity) or
+   * prune files within partitions (partition granularity).
+   * <p>
+   * Data sources that rely on multi-version concurrency control must ensure the same version of
+   * the table is scanned in both scans.
+   */
+  default ScanBuilder newAffectedGroupsScanBuilder(CaseInsensitiveStringMap options) {

Review Comment:
   I am not entirely happy with the API (naming ideas are welcome) but let me explain my current thoughts.
   
   I like using runtime filtering to filter our groups as it allows us to benefit from the existing runtime filtering framework and get features like reuse of subqueries for free. The runtime filter is also part of the same plan shown in the Spark UI so users won't have to guess which Spark jobs belong to the DELETE operation. We also don't have to execute any queries in the optimizer, which makes EXPLAIN fast and descriptive. The rule below shows when a runtime filter is injected. It is fairly straightforward. 
   
   Instead of exposing this method, I considered scanning `Table` which we can access via `RowLevelOperationTable`. However, we need to ensure the same version/snapshot of the table is scanned in the primary and filtering scans. Just reusing the same `Table` instance in both queries does not seem to guarantee the same version of the table will be scanned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r855452395


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RowLevelOperation.java:
##########
@@ -69,6 +70,33 @@ default String description() {
    */
   ScanBuilder newScanBuilder(CaseInsensitiveStringMap options);
 
+  /**
+   * Returns a {@link ScanBuilder} to configure a {@link Scan} for runtime filtering of groups
+   * that are affected by this row-level operation.
+   * <p>
+   * Data sources that replace groups of data (e.g. files, partitions) may exclude entire groups
+   * using provided data source filters when building the primary scan for this row-level operation.
+   * However, such data skipping is limited as not all expressions can be converted into data source
+   * filters and some can only be evaluated by Spark (e.g. subqueries). Since rewriting groups is
+   * expensive, Spark allows group-based data sources to filter groups at runtime. The runtime
+   * filtering enables data sources to narrow down the scope of rewriting to only groups that must
+   * be rewritten. If the primary scan implements {@link SupportsRuntimeFiltering}, Spark will
+   * dynamically execute a query to find which records match the condition. The information about
+   * these records will be then passed back into the primary scan, allowing data sources to discard
+   * groups that don't have to be rewritten.
+   * <p>
+   * This method allows data sources to provide a dedicated scan builder for group filtering.
+   * Scans built for runtime group filtering are not required to produce all rows in a group
+   * if any are returned. Instead, they can push filters into files (file granularity) or
+   * prune files within partitions (partition granularity).
+   * <p>
+   * Data sources that rely on multi-version concurrency control must ensure the same version of
+   * the table is scanned in both scans.
+   */
+  default ScanBuilder newAffectedGroupsScanBuilder(CaseInsensitiveStringMap options) {

Review Comment:
   I am not entirely happy with the API (naming ideas are welcome) but let me explain my current thoughts.
   
   I like using runtime filtering to filter our groups as it allows us to benefit from the existing runtime filtering framework and get features like reuse of subqueries for free. The runtime filter is also part of the same plan shown in the Spark UI. We also don't have to execute any queries in the optimizer, which makes EXPLAIN fast and descriptive. The rule below shows when a runtime filter is injected. It is fairly straightforward. 
   
   Instead of exposing this method, I considered just scanning the original `Table` which we can access via `RowLevelOperationTable`. However, we need to ensure the same version/snapshot of the table is scanned in the primary and filtering scans. Just reusing the same `Table` instance in both queries does not seem to guarantee the same version of the table will be scanned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r855452395


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RowLevelOperation.java:
##########
@@ -69,6 +70,33 @@ default String description() {
    */
   ScanBuilder newScanBuilder(CaseInsensitiveStringMap options);
 
+  /**
+   * Returns a {@link ScanBuilder} to configure a {@link Scan} for runtime filtering of groups
+   * that are affected by this row-level operation.
+   * <p>
+   * Data sources that replace groups of data (e.g. files, partitions) may exclude entire groups
+   * using provided data source filters when building the primary scan for this row-level operation.
+   * However, such data skipping is limited as not all expressions can be converted into data source
+   * filters and some can only be evaluated by Spark (e.g. subqueries). Since rewriting groups is
+   * expensive, Spark allows group-based data sources to filter groups at runtime. The runtime
+   * filtering enables data sources to narrow down the scope of rewriting to only groups that must
+   * be rewritten. If the primary scan implements {@link SupportsRuntimeFiltering}, Spark will
+   * dynamically execute a query to find which records match the condition. The information about
+   * these records will be then passed back into the primary scan, allowing data sources to discard
+   * groups that don't have to be rewritten.
+   * <p>
+   * This method allows data sources to provide a dedicated scan builder for group filtering.
+   * Scans built for runtime group filtering are not required to produce all rows in a group
+   * if any are returned. Instead, they can push filters into files (file granularity) or
+   * prune files within partitions (partition granularity).
+   * <p>
+   * Data sources that rely on multi-version concurrency control must ensure the same version of
+   * the table is scanned in both scans.
+   */
+  default ScanBuilder newAffectedGroupsScanBuilder(CaseInsensitiveStringMap options) {

Review Comment:
   I am not entirely happy with the API but let me explain my current thoughts.
   
   I like using runtime filtering to filter our groups as it allows us to benefit from the existing runtime filtering framework and get features like reuse of stages for free. The runtime filter is also part of the same plan shown in the Spark UI. We also don't have to execute any queries in the optimizer, which makes EXPLAIN fast and descriptive. The rule below shows when a runtime filter is injected. It is fairly straightforward. 
   
   Instead of exposing this method, I considered just scanning the original `Table` which we can access via `RowLevelOperationTable`. However, we need to ensure the same version/snapshot of the table is scanned in the primary and filtering scans.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #36304:
URL: https://github.com/apache/spark/pull/36304#issuecomment-1275312937

   Thank you for reviewing, @dongjoon-hyun @viirya @huaxingao @rdblue!
   
   Also, thanks @cloud-fan for the original discussion we had around this feature.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r999702364


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.dynamicpruning
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation}
+
+/**
+ * A rule that assigns a subquery to filter groups in row-level operations at runtime.
+ *
+ * Data skipping during job planning for row-level operations is limited to expressions that can be
+ * converted to data source filters. Since not all expressions can be pushed down that way and
+ * rewriting groups is expensive, Spark allows data sources to filter group at runtime.
+ * If the primary scan in a group-based row-level operation supports runtime filtering, this rule
+ * will inject a subquery to find all rows that match the condition so that data sources know
+ * exactly which groups must be rewritten.
+ *
+ * Note this rule only applies to group-based row-level operations.
+ */
+case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])
+  extends Rule[LogicalPlan] with PredicateHelper {
+
+  import DataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+    // apply special dynamic filtering only for group-based row-level operations
+    case GroupBasedRowLevelOperation(replaceData, cond,
+        DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _))
+        if conf.runtimeRowLevelOperationGroupFilterEnabled && cond != TrueLiteral =>
+
+      // use reference equality on scan to find required scan relations
+      val newQuery = replaceData.query transformUp {
+        case r: DataSourceV2ScanRelation if r.scan eq scan =>
+          // use the original table instance that was loaded for this row-level operation
+          // in order to leverage a regular batch scan in the group filter query
+          val originalTable = r.relation.table.asRowLevelOperationTable.table
+          val relation = r.relation.copy(table = originalTable)
+          val matchingRowsPlan = buildMatchingRowsPlan(relation, cond)
+
+          val filterAttrs = scan.filterAttributes
+          val buildKeys = V2ExpressionUtils.resolveRefs[Attribute](filterAttrs, matchingRowsPlan)
+          val pruningKeys = V2ExpressionUtils.resolveRefs[Attribute](filterAttrs, r)
+          val dynamicPruningCond = buildDynamicPruningCond(matchingRowsPlan, buildKeys, pruningKeys)
+
+          Filter(dynamicPruningCond, r)
+      }
+
+      // optimize subqueries to rewrite them as joins and trigger job planning
+      replaceData.copy(query = optimizeSubqueries(newQuery))

Review Comment:
   Does this mean we revert what we did in `RewriteDeleteFromTable` before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r1014850614


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.dynamicpruning
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation}
+
+/**
+ * A rule that assigns a subquery to filter groups in row-level operations at runtime.
+ *
+ * Data skipping during job planning for row-level operations is limited to expressions that can be
+ * converted to data source filters. Since not all expressions can be pushed down that way and
+ * rewriting groups is expensive, Spark allows data sources to filter group at runtime.
+ * If the primary scan in a group-based row-level operation supports runtime filtering, this rule
+ * will inject a subquery to find all rows that match the condition so that data sources know
+ * exactly which groups must be rewritten.
+ *
+ * Note this rule only applies to group-based row-level operations.
+ */
+case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])

Review Comment:
   @sunchao is correct. It wasn't easy to call `OptimizeSubqueries` outside `Optimizer`. Hence, I had to come up with this workaround.
   
   @cloud-fan, I also considered simply adding `OptimizeSubqueries` to the batch with runtime partition filtering. However, SPARK-36444 specifically removed it from there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r984981870


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.dynamicpruning
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation}
+
+/**
+ * A rule that assigns a subquery to filter groups in row-level operations at runtime.
+ *
+ * Data skipping during job planning for row-level operations is limited to expressions that can be
+ * converted to data source filters. Since not all expressions can be pushed down that way and
+ * rewriting groups is expensive, Spark allows data sources to filter group at runtime.
+ * If the primary scan in a group-based row-level operation supports runtime filtering, this rule
+ * will inject a subquery to find all rows that match the condition so that data sources know
+ * exactly which groups must be rewritten.
+ *
+ * Note this rule only applies to group-based row-level operations.
+ */
+case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])
+  extends Rule[LogicalPlan] with PredicateHelper {
+
+  import DataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+    // apply special dynamic filtering only for group-based row-level operations
+    case GroupBasedRowLevelOperation(replaceData, cond,
+        DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _))
+        if conf.runtimeRowLevelOperationGroupFilterEnabled && cond != TrueLiteral =>
+
+      // use reference equality on scan to find required scan relations
+      val newQuery = replaceData.query transformUp {
+        case r: DataSourceV2ScanRelation if r.scan eq scan =>
+          // use the original table instance that was loaded for this row-level operation
+          // in order to leverage a regular batch scan in the group filter query
+          val originalTable = r.relation.table.asRowLevelOperationTable.table
+          val relation = r.relation.copy(table = originalTable)

Review Comment:
   We build `DataSourceV2Relation` and trigger planning so the scan can prune columns and push filters into groups.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.dynamicpruning
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation}
+
+/**
+ * A rule that assigns a subquery to filter groups in row-level operations at runtime.
+ *
+ * Data skipping during job planning for row-level operations is limited to expressions that can be
+ * converted to data source filters. Since not all expressions can be pushed down that way and
+ * rewriting groups is expensive, Spark allows data sources to filter group at runtime.
+ * If the primary scan in a group-based row-level operation supports runtime filtering, this rule
+ * will inject a subquery to find all rows that match the condition so that data sources know
+ * exactly which groups must be rewritten.
+ *
+ * Note this rule only applies to group-based row-level operations.
+ */
+case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])
+  extends Rule[LogicalPlan] with PredicateHelper {
+
+  import DataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+    // apply special dynamic filtering only for group-based row-level operations
+    case GroupBasedRowLevelOperation(replaceData, cond,
+        DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _))
+        if conf.runtimeRowLevelOperationGroupFilterEnabled && cond != TrueLiteral =>
+
+      // use reference equality on scan to find required scan relations
+      val newQuery = replaceData.query transformUp {
+        case r: DataSourceV2ScanRelation if r.scan eq scan =>
+          // use the original table instance that was loaded for this row-level operation
+          // in order to leverage a regular batch scan in the group filter query
+          val originalTable = r.relation.table.asRowLevelOperationTable.table
+          val relation = r.relation.copy(table = originalTable)

Review Comment:
   We build `DataSourceV2Relation` so the scan can prune columns and push filters into groups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r984981183


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.dynamicpruning
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation}
+
+/**
+ * A rule that assigns a subquery to filter groups in row-level operations at runtime.
+ *
+ * Data skipping during job planning for row-level operations is limited to expressions that can be
+ * converted to data source filters. Since not all expressions can be pushed down that way and
+ * rewriting groups is expensive, Spark allows data sources to filter group at runtime.
+ * If the primary scan in a group-based row-level operation supports runtime filtering, this rule
+ * will inject a subquery to find all rows that match the condition so that data sources know
+ * exactly which groups must be rewritten.
+ *
+ * Note this rule only applies to group-based row-level operations.
+ */
+case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])
+  extends Rule[LogicalPlan] with PredicateHelper {
+
+  import DataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+    // apply special dynamic filtering only for group-based row-level operations
+    case GroupBasedRowLevelOperation(replaceData, cond,
+        DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _))

Review Comment:
   This is the optimizer rule that checks whether the primary row-level scan supports runtime filtering. As long as a data source implements `SupportsRuntimeV2Filtering`, it should be enough to benefit from the new functionality.
   
   Also, the runtime group filter uses the existing framework for runtime filtering in DS V2, meaning we get all the benefits like subquery reuse, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] rdblue commented on pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
rdblue commented on PR #36304:
URL: https://github.com/apache/spark/pull/36304#issuecomment-1261555810

   I talked with @aokolnychyi about this and I think this is a data source problem, not something Spark should track right now.
   
   The main problem is that some table sources have different versions and that's not something that we're used to handling. Data sources that don't have different versions are not affected, so option 1 is not great because it forces everyone to deal with a problem only few sources have.
   
   Spark could use option 2 and track this itself, but that complicates the API as well and we don't know that we need it yet. If we do add version/history to Spark then we'd probably want to add `SHOW HISTORY` and things as well.
   
   We've also found a reliable way for option 3 to work. The underlying table instance is the same, so the filter method just needs to check that the table instance has not been refreshed or modified when the runtime filter is applied to it.
   
   I think that option 3 is the simplest approach in terms of new Spark APIs (none!) and is the right way forward until Spark decides to model tables with multiple versions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r999716662


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala:
##########
@@ -50,7 +50,8 @@ class SparkOptimizer(
   override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+
     Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+
     Batch("PartitionPruning", Once,
-      PartitionPruning) :+
+      PartitionPruning,
+      RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries)) :+

Review Comment:
   I think another idea is to run `OptimizeSubqueries` in this batch:
   ```
   PartitionPruning,
   RowLevelOperationRuntimeGroupFiltering,
   // The rule above may create subqueries, need to optimize them.
   OptimizeSubqueries
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #36304:
URL: https://github.com/apache/spark/pull/36304#issuecomment-1284289408

   This is much simpler than I expected, great design! Sorry for the late review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r991039734


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala:
##########
@@ -34,6 +34,8 @@ class InMemoryRowLevelOperationTable(
     properties: util.Map[String, String])
   extends InMemoryTable(name, schema, partitioning, properties) with SupportsRowLevelOperations {
 
+  var replacedPartitions: Seq[Seq[Any]] = Seq.empty

Review Comment:
   Maybe add a comment to mention this is for test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
sunchao commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r999654411


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.dynamicpruning
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation}
+
+/**
+ * A rule that assigns a subquery to filter groups in row-level operations at runtime.
+ *
+ * Data skipping during job planning for row-level operations is limited to expressions that can be
+ * converted to data source filters. Since not all expressions can be pushed down that way and
+ * rewriting groups is expensive, Spark allows data sources to filter group at runtime.
+ * If the primary scan in a group-based row-level operation supports runtime filtering, this rule
+ * will inject a subquery to find all rows that match the condition so that data sources know
+ * exactly which groups must be rewritten.
+ *
+ * Note this rule only applies to group-based row-level operations.
+ */
+case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])

Review Comment:
   I also thought about this, but I think it's hard to reference `OptimizeSubqueries` outside `Optimizer`, since the former is more like a "inner class" of the latter, and references the current instance of latter in itself (i.e.: `Optimizer.this.execute(Subquery.fromExpression(s))`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
huaxingao commented on PR #36304:
URL: https://github.com/apache/spark/pull/36304#issuecomment-1272856573

   +1, this PR looks good to me. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #36304:
URL: https://github.com/apache/spark/pull/36304#issuecomment-1105514003

   @cloud-fan, here is one way to achieve proper group filtering in row-level operations. While I don't target 3.3 with this functionality, I believe it is still important to finish this before 3.3 is officially out. That way, we still have a chance to change the row-level API if needed before it gets released. Currently, it is fully backward compatible.
   
   cc @rdblue @huaxingao @sunchao @viirya @dongjoon-hyun
   
   Some tests are expected to fail due to SPARK-38977. I have another PR #36303 to fix this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r855452395


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RowLevelOperation.java:
##########
@@ -69,6 +70,33 @@ default String description() {
    */
   ScanBuilder newScanBuilder(CaseInsensitiveStringMap options);
 
+  /**
+   * Returns a {@link ScanBuilder} to configure a {@link Scan} for runtime filtering of groups
+   * that are affected by this row-level operation.
+   * <p>
+   * Data sources that replace groups of data (e.g. files, partitions) may exclude entire groups
+   * using provided data source filters when building the primary scan for this row-level operation.
+   * However, such data skipping is limited as not all expressions can be converted into data source
+   * filters and some can only be evaluated by Spark (e.g. subqueries). Since rewriting groups is
+   * expensive, Spark allows group-based data sources to filter groups at runtime. The runtime
+   * filtering enables data sources to narrow down the scope of rewriting to only groups that must
+   * be rewritten. If the primary scan implements {@link SupportsRuntimeFiltering}, Spark will
+   * dynamically execute a query to find which records match the condition. The information about
+   * these records will be then passed back into the primary scan, allowing data sources to discard
+   * groups that don't have to be rewritten.
+   * <p>
+   * This method allows data sources to provide a dedicated scan builder for group filtering.
+   * Scans built for runtime group filtering are not required to produce all rows in a group
+   * if any are returned. Instead, they can push filters into files (file granularity) or
+   * prune files within partitions (partition granularity).
+   * <p>
+   * Data sources that rely on multi-version concurrency control must ensure the same version of
+   * the table is scanned in both scans.
+   */
+  default ScanBuilder newAffectedGroupsScanBuilder(CaseInsensitiveStringMap options) {

Review Comment:
   I am not entirely happy with the API (naming ideas are welcome) but let me explain my current thoughts.
   
   I like using runtime filtering to filter our groups as it allows us to benefit from the existing runtime filtering framework and get features like reuse of stages for free. The runtime filter is also part of the same plan shown in the Spark UI. We also don't have to execute any queries in the optimizer, which makes EXPLAIN fast and descriptive. The rule below shows when a runtime filter is injected. It is fairly straightforward. 
   
   Instead of exposing this method, I considered just scanning the original `Table` which we can access via `RowLevelOperationTable`. However, we need to ensure the same version/snapshot of the table is scanned in the primary and filtering scans. Just reusing the same `Table` instance in both queries does not seem to guarantee the same version of the table will be scanned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r1014855632


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.dynamicpruning
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation}
+
+/**
+ * A rule that assigns a subquery to filter groups in row-level operations at runtime.
+ *
+ * Data skipping during job planning for row-level operations is limited to expressions that can be
+ * converted to data source filters. Since not all expressions can be pushed down that way and
+ * rewriting groups is expensive, Spark allows data sources to filter group at runtime.
+ * If the primary scan in a group-based row-level operation supports runtime filtering, this rule
+ * will inject a subquery to find all rows that match the condition so that data sources know
+ * exactly which groups must be rewritten.
+ *
+ * Note this rule only applies to group-based row-level operations.
+ */
+case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])

Review Comment:
   An alternative idea could be to move `OptimizeSubqueries` into its own file. However, that's tricky too as it calls the optimizer.
   
   ```
   Optimizer.this.execute(Subquery.fromExpression(s))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r1014850975


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.dynamicpruning
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation}
+
+/**
+ * A rule that assigns a subquery to filter groups in row-level operations at runtime.
+ *
+ * Data skipping during job planning for row-level operations is limited to expressions that can be
+ * converted to data source filters. Since not all expressions can be pushed down that way and
+ * rewriting groups is expensive, Spark allows data sources to filter group at runtime.
+ * If the primary scan in a group-based row-level operation supports runtime filtering, this rule
+ * will inject a subquery to find all rows that match the condition so that data sources know
+ * exactly which groups must be rewritten.
+ *
+ * Note this rule only applies to group-based row-level operations.
+ */
+case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])

Review Comment:
   Like I said [here](https://github.com/apache/spark/pull/36304#discussion_r1014850793), we could move the new rule into a separate batch and add `OptimizeSubqueries` to it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r990544792


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuite.scala:
##########
@@ -626,4 +633,142 @@ abstract class DeleteFromTableSuiteBase
   }
 }
 
-class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase
+class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase {

Review Comment:
   Yes~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r855452395


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RowLevelOperation.java:
##########
@@ -69,6 +70,33 @@ default String description() {
    */
   ScanBuilder newScanBuilder(CaseInsensitiveStringMap options);
 
+  /**
+   * Returns a {@link ScanBuilder} to configure a {@link Scan} for runtime filtering of groups
+   * that are affected by this row-level operation.
+   * <p>
+   * Data sources that replace groups of data (e.g. files, partitions) may exclude entire groups
+   * using provided data source filters when building the primary scan for this row-level operation.
+   * However, such data skipping is limited as not all expressions can be converted into data source
+   * filters and some can only be evaluated by Spark (e.g. subqueries). Since rewriting groups is
+   * expensive, Spark allows group-based data sources to filter groups at runtime. The runtime
+   * filtering enables data sources to narrow down the scope of rewriting to only groups that must
+   * be rewritten. If the primary scan implements {@link SupportsRuntimeFiltering}, Spark will
+   * dynamically execute a query to find which records match the condition. The information about
+   * these records will be then passed back into the primary scan, allowing data sources to discard
+   * groups that don't have to be rewritten.
+   * <p>
+   * This method allows data sources to provide a dedicated scan builder for group filtering.
+   * Scans built for runtime group filtering are not required to produce all rows in a group
+   * if any are returned. Instead, they can push filters into files (file granularity) or
+   * prune files within partitions (partition granularity).
+   * <p>
+   * Data sources that rely on multi-version concurrency control must ensure the same version of
+   * the table is scanned in both scans.
+   */
+  default ScanBuilder newAffectedGroupsScanBuilder(CaseInsensitiveStringMap options) {

Review Comment:
   I am not entirely happy with the API but let me explain my current thoughts.
   
   I like using runtime filtering to filter our groups as it allows us to benefit from the existing runtime filtering framework and get features like reuse of stages for free. The runtime filter is also part of the same plan shown in the Spark UI. We also don't have to execute any queries in the optimizer, which makes EXPLAIN fast and descriptive. The rule below shows when a runtime filter is injected. It is fairly straightforward. 
   
   Instead of exposing this method, I considered just scanning the original `Table` which we can access via `RowLevelOperationTable`. However, we need to ensure the same version/snapshot of the table is scanned in the primary and filtering scans. Just reusing the same `Table` instance in both queries does not seem to guarantee the same version of the table will be scanned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #36304:
URL: https://github.com/apache/spark/pull/36304#issuecomment-1258618345

   I want to resume working on this PR but I need feedback on one point.
   
   In the original implementation, @cloud-fan and I discussed supporting a separate scan builder for runtime group filtering in row-level operations. That way, we can prune columns and push down filters while looking for groups that have matches. We can't do that in the main row-level scan for group-based data sources as non-matching records in matching groups have to be copied over. See PR #35395 for context.
   
   The only challenge is ensuring the same version of the table is scanned in the main row-level scan and in the scan that searches for matching groups to rewrite. There are multiple solutions to consider.
   
   **Option 1**
   
   The first option is shown in this PR. We can add a new method to `RowLevelOperation` that would provide us a scan builder for runtime group filtering.
   
   ```
   interface RowLevelOperation {
     // existing method
     ScanBuilder newScanBuilder(CaseInsensitiveStringMap options);
   
     // new method
     default ScanBuilder newAffectedGroupsScanBuilder(CaseInsensitiveStringMap options) {
        return newScanBuilder(options);
     }
   
     ...
   }
   ```
   
   Under this implementation, it is up to data sources to ensure the same version is scanned in both scans. It is a fairly simple approach but it complicates the row-level API. On top, the new method is useless for data sources that can handle a delta of rows.
   
   **Option 2**
   
   The main row-level `Scan` can report scanned `tableVersion` and we can use that information to load a correct table version in the rule that assigns a runtime filter. This can be done via `TableCatalog$load(ident, version)`. The only API change is to extend `Scan` with `tableVersion` to know which table version is being read in the main scan.
   
   **Option 3**
   
   The rule that assigns a runtime group filter has access to the original `Table` object. We could just call `newScanBuilder` on it. However, I don't see anything in the API implying that reusing the `Table` instance guarantees the same version of the table will be scanned. If we call `newScanBuilder` on the same `Table` instance, do we expect the same version to be scanned? Seems like it is NOT the assumption right now.
   
   If we can somehow benefit from reusing `Table` object, it will be the cleanest option from the API perspective.
   
   Any ideas how to make Option 3 work?
   
   cc @cloud-fan @rdblue @huaxingao @dongjoon-hyun @sunchao @viirya
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r991702185


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala:
##########
@@ -34,6 +34,8 @@ class InMemoryRowLevelOperationTable(
     properties: util.Map[String, String])
   extends InMemoryTable(name, schema, partitioning, properties) with SupportsRowLevelOperations {
 
+  var replacedPartitions: Seq[Seq[Any]] = Seq.empty

Review Comment:
   Added a comment above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r991049796


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuite.scala:
##########
@@ -626,4 +633,142 @@ abstract class DeleteFromTableSuiteBase
   }
 }
 
-class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase
+class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase {

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r999656305


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.dynamicpruning
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation}
+
+/**
+ * A rule that assigns a subquery to filter groups in row-level operations at runtime.
+ *
+ * Data skipping during job planning for row-level operations is limited to expressions that can be
+ * converted to data source filters. Since not all expressions can be pushed down that way and
+ * rewriting groups is expensive, Spark allows data sources to filter group at runtime.
+ * If the primary scan in a group-based row-level operation supports runtime filtering, this rule
+ * will inject a subquery to find all rows that match the condition so that data sources know
+ * exactly which groups must be rewritten.
+ *
+ * Note this rule only applies to group-based row-level operations.
+ */
+case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])
+  extends Rule[LogicalPlan] with PredicateHelper {
+
+  import DataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+    // apply special dynamic filtering only for group-based row-level operations
+    case GroupBasedRowLevelOperation(replaceData, cond,
+        DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _))
+        if conf.runtimeRowLevelOperationGroupFilterEnabled && cond != TrueLiteral =>
+
+      // use reference equality on scan to find required scan relations
+      val newQuery = replaceData.query transformUp {
+        case r: DataSourceV2ScanRelation if r.scan eq scan =>
+          // use the original table instance that was loaded for this row-level operation
+          // in order to leverage a regular batch scan in the group filter query
+          val originalTable = r.relation.table.asRowLevelOperationTable.table
+          val relation = r.relation.copy(table = originalTable)
+          val matchingRowsPlan = buildMatchingRowsPlan(relation, cond)
+
+          val filterAttrs = scan.filterAttributes
+          val buildKeys = V2ExpressionUtils.resolveRefs[Attribute](filterAttrs, matchingRowsPlan)
+          val pruningKeys = V2ExpressionUtils.resolveRefs[Attribute](filterAttrs, r)
+          val dynamicPruningCond = buildDynamicPruningCond(matchingRowsPlan, buildKeys, pruningKeys)
+
+          Filter(dynamicPruningCond, r)
+      }
+
+      // optimize subqueries to rewrite them as joins and trigger job planning
+      replaceData.copy(query = optimizeSubqueries(newQuery))
+  }
+
+  private def buildMatchingRowsPlan(
+      relation: DataSourceV2Relation,
+      cond: Expression): LogicalPlan = {
+
+    val matchingRowsPlan = Filter(cond, relation)
+
+    // clone the relation and assign new expr IDs to avoid conflicts
+    matchingRowsPlan transformUpWithNewOutput {
+      case r: DataSourceV2Relation if r eq relation =>
+        val oldOutput = r.output
+        val newOutput = oldOutput.map(_.newInstance())
+        r.copy(output = newOutput) -> oldOutput.zip(newOutput)

Review Comment:
   nit:
   ```
   val newRelation = r.newInstance
   newRelation -> r.output.zip(newRelation.output)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun closed pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun closed pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands
URL: https://github.com/apache/spark/pull/36304


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] commented on pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #36304:
URL: https://github.com/apache/spark/pull/36304#issuecomment-1255683830

   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #36304:
URL: https://github.com/apache/spark/pull/36304#issuecomment-1284730807

   I will be off until next Monday. I'll address the comments then. Thanks for taking a look, @cloud-fan!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #36304:
URL: https://github.com/apache/spark/pull/36304#issuecomment-1304020160

   Still remember about following up on this and another PR. Slowly getting there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r984981183


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.dynamicpruning
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation}
+
+/**
+ * A rule that assigns a subquery to filter groups in row-level operations at runtime.
+ *
+ * Data skipping during job planning for row-level operations is limited to expressions that can be
+ * converted to data source filters. Since not all expressions can be pushed down that way and
+ * rewriting groups is expensive, Spark allows data sources to filter group at runtime.
+ * If the primary scan in a group-based row-level operation supports runtime filtering, this rule
+ * will inject a subquery to find all rows that match the condition so that data sources know
+ * exactly which groups must be rewritten.
+ *
+ * Note this rule only applies to group-based row-level operations.
+ */
+case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan])
+  extends Rule[LogicalPlan] with PredicateHelper {
+
+  import DataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+    // apply special dynamic filtering only for group-based row-level operations
+    case GroupBasedRowLevelOperation(replaceData, cond,
+        DataSourceV2ScanRelation(_, scan: SupportsRuntimeV2Filtering, _, _, _))

Review Comment:
   This is the optimizer rule that checks whether the primary row-level scan supports runtime filtering. As long as a data source implements `SupportsRuntimeV2Filtering`, it should be sufficient to benefit from the new functionality.
   
   Also, the runtime group filter uses the existing framework for runtime filtering in DS V2, meaning we get all the benefits like subquery reuse, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r984982201


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RowLevelOperation.java:
##########
@@ -69,6 +70,33 @@ default String description() {
    */
   ScanBuilder newScanBuilder(CaseInsensitiveStringMap options);
 
+  /**
+   * Returns a {@link ScanBuilder} to configure a {@link Scan} for runtime filtering of groups
+   * that are affected by this row-level operation.
+   * <p>
+   * Data sources that replace groups of data (e.g. files, partitions) may exclude entire groups
+   * using provided data source filters when building the primary scan for this row-level operation.
+   * However, such data skipping is limited as not all expressions can be converted into data source
+   * filters and some can only be evaluated by Spark (e.g. subqueries). Since rewriting groups is
+   * expensive, Spark allows group-based data sources to filter groups at runtime. The runtime
+   * filtering enables data sources to narrow down the scope of rewriting to only groups that must
+   * be rewritten. If the primary scan implements {@link SupportsRuntimeFiltering}, Spark will
+   * dynamically execute a query to find which records match the condition. The information about
+   * these records will be then passed back into the primary scan, allowing data sources to discard
+   * groups that don't have to be rewritten.
+   * <p>
+   * This method allows data sources to provide a dedicated scan builder for group filtering.
+   * Scans built for runtime group filtering are not required to produce all rows in a group
+   * if any are returned. Instead, they can push filters into files (file granularity) or
+   * prune files within partitions (partition granularity).
+   * <p>
+   * Data sources that rely on multi-version concurrency control must ensure the same version of
+   * the table is scanned in both scans.
+   */
+  default ScanBuilder newAffectedGroupsScanBuilder(CaseInsensitiveStringMap options) {

Review Comment:
   Resolving as it no longer applies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on PR #36304:
URL: https://github.com/apache/spark/pull/36304#issuecomment-1275253185

   Merged to master for Apache Spark 3.4.0.
   Thank you, @aokolnychyi , @rdblue , @viirya , @huaxingao .
   
   Also, cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #36304: [SPARK-38959][SQL] DS V2: Support runtime group filtering in row-level commands

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #36304:
URL: https://github.com/apache/spark/pull/36304#discussion_r855452395


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/RowLevelOperation.java:
##########
@@ -69,6 +70,33 @@ default String description() {
    */
   ScanBuilder newScanBuilder(CaseInsensitiveStringMap options);
 
+  /**
+   * Returns a {@link ScanBuilder} to configure a {@link Scan} for runtime filtering of groups
+   * that are affected by this row-level operation.
+   * <p>
+   * Data sources that replace groups of data (e.g. files, partitions) may exclude entire groups
+   * using provided data source filters when building the primary scan for this row-level operation.
+   * However, such data skipping is limited as not all expressions can be converted into data source
+   * filters and some can only be evaluated by Spark (e.g. subqueries). Since rewriting groups is
+   * expensive, Spark allows group-based data sources to filter groups at runtime. The runtime
+   * filtering enables data sources to narrow down the scope of rewriting to only groups that must
+   * be rewritten. If the primary scan implements {@link SupportsRuntimeFiltering}, Spark will
+   * dynamically execute a query to find which records match the condition. The information about
+   * these records will be then passed back into the primary scan, allowing data sources to discard
+   * groups that don't have to be rewritten.
+   * <p>
+   * This method allows data sources to provide a dedicated scan builder for group filtering.
+   * Scans built for runtime group filtering are not required to produce all rows in a group
+   * if any are returned. Instead, they can push filters into files (file granularity) or
+   * prune files within partitions (partition granularity).
+   * <p>
+   * Data sources that rely on multi-version concurrency control must ensure the same version of
+   * the table is scanned in both scans.
+   */
+  default ScanBuilder newAffectedGroupsScanBuilder(CaseInsensitiveStringMap options) {

Review Comment:
   I am not entirely happy with the API (naming ideas are welcome) but let me explain my current thoughts.
   
   I like using runtime filtering to filter our groups as it allows us to benefit from the existing runtime filtering framework and get features like reuse of subqueries for free. The runtime filter is also part of the same plan shown in the Spark UI so users won't have to guess which Spark jobs belong to the DELETE operation. We also don't have to execute any queries in the optimizer, which makes EXPLAIN fast and descriptive. The rule below shows when a runtime filter is injected. It is fairly straightforward. 
   
   Instead of exposing this method, I considered just scanning the original `Table` which we can access via `RowLevelOperationTable`. However, we need to ensure the same version/snapshot of the table is scanned in the primary and filtering scans. Just reusing the same `Table` instance in both queries does not seem to guarantee the same version of the table will be scanned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org