You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "aokolnychyi (via GitHub)" <gi...@apache.org> on 2023/05/24 20:40:16 UTC

[GitHub] [spark] aokolnychyi opened a new pull request, #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

aokolnychyi opened a new pull request, #41300:
URL: https://github.com/apache/spark/pull/41300

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   This PR adds a way for data sources to request Spark to represent updates as deletes and inserts. 
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   It may be beneficial for data sources to represent updates as deletes and inserts for delta-based implementations. Specifically, it may help to properly distribute and order records before writing.
   
   Delete records set only row ID and metadata attributes. Update records set data, row ID, metadata attributes. Insert records set only data attributes.
   
   For instance, a data source may rely on a metadata column `_row_id` (synthetic internally generated) to identify the row and may be partitioned by `bucket(product_id)`. Splitting updates into inserts and deletes would allow data sources to cluster all update and insert records in MERGE for the same partition into a single task. Otherwise, the clustering key for updates and inserts will be different (inserts will always have `_row_id` as null as it is a metadata column). The new functionality is critical to reduce the number of generated files. It also makes sense in UPDATE operations if the original and new partition of a record do not match.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   This PR adds a new method to `SupportsDelta` but the change is backward compatible.
    
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   This PR comes with tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #41300:
URL: https://github.com/apache/spark/pull/41300#discussion_r1210627366


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SplitUpdateAsDeleteAndInsert.scala:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression}
+import org.apache.spark.sql.catalyst.util.truncatedString
+
+case class SplitUpdateAsDeleteAndInsert(

Review Comment:
   Switched to `Expand`, will follow up with improvements later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #41300:
URL: https://github.com/apache/spark/pull/41300#discussion_r1204905927


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SplitUpdateAsDeleteAndInsert.scala:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression}
+import org.apache.spark.sql.catalyst.util.truncatedString
+
+case class SplitUpdateAsDeleteAndInsert(

Review Comment:
   Okay, I do think this node is a bit more efficient compared to `Expand` as it acts almost like `Project`. 
   
   - No need to call `copy()` on each row.
   - Sub expression elimination like in `ProjectExec`.
   - Deferred evaluation of input attributes unless they are needed like in `ProjectExec`.
   
   What are your thoughts, do you think it is worth having such a node? Technically, `Expand` would work too but it will be less efficient and will probably require more memory. This use case does not require a generic approach like in `Expand`.
   
   cc @dongjoon-hyun @viirya @cloud-fan @sunchao @huaxingao



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #41300:
URL: https://github.com/apache/spark/pull/41300#issuecomment-1570697919

   Thanks, @dongjoon-hyun @cloud-fan @viirya!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #41300:
URL: https://github.com/apache/spark/pull/41300#discussion_r1204766585


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SplitUpdateAsDeleteAndInsertExec.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, BindReferences, Expression, Projection, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.catalyst.util.truncatedString
+
+case class SplitUpdateAsDeleteAndInsertExec(
+    deleteOutput: Seq[Expression],
+    insertOutput: Seq[Expression],
+    output: Seq[Attribute],
+    child: SparkPlan) extends UnaryExecNode with CodegenSupport {
+
+  @transient override lazy val producedAttributes: AttributeSet = {
+    AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"SplitUpdateAsDeleteAndInsertExec${truncatedString(output, "[", ", ", "]", maxFields)}"
+  }
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    child.execute().mapPartitions(processPartition)
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = {
+    copy(child = newChild)
+  }
+
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val exprs = insertOutput ++ deleteOutput
+    val exprIds = exprs.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = exprIds.groupBy(id => id).filter(_._2.size > 1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {

Review Comment:
   Checking if I need to also override `needCopyResult`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #41300:
URL: https://github.com/apache/spark/pull/41300#discussion_r1204767183


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateAsDeleteAndInsertTableSuite.scala:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import org.apache.spark.sql.{AnalysisException, Row}
+
+class DeltaBasedUpdateAsDeleteAndInsertTableSuite extends UpdateTableSuiteBase {

Review Comment:
   I wonder whether I should have two copies of this suite (with/without codegen).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #41300:
URL: https://github.com/apache/spark/pull/41300#discussion_r1204918849


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SplitUpdateAsDeleteAndInsert.scala:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression}
+import org.apache.spark.sql.catalyst.util.truncatedString
+
+case class SplitUpdateAsDeleteAndInsert(

Review Comment:
   Maybe, we can implement some of those optimizations for `Expand` instead of adding another node. I am inclined towards using `Expand` but let me know what everyone thinks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #41300:
URL: https://github.com/apache/spark/pull/41300#discussion_r1204918849


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SplitUpdateAsDeleteAndInsert.scala:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression}
+import org.apache.spark.sql.catalyst.util.truncatedString
+
+case class SplitUpdateAsDeleteAndInsert(

Review Comment:
   Maybe, we can implement some of those optimizations for Expand instead of adding another node. Anyway, let me know. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun closed pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun closed pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts
URL: https://github.com/apache/spark/pull/41300


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #41300:
URL: https://github.com/apache/spark/pull/41300#issuecomment-1570494144

   Thank you again, @aokolnychyi , @cloud-fan , @viirya . 
   Merged to master for Apache Spark 3.5.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #41300:
URL: https://github.com/apache/spark/pull/41300#discussion_r1204767183


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateAsDeleteAndInsertTableSuite.scala:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import org.apache.spark.sql.{AnalysisException, Row}
+
+class DeltaBasedUpdateAsDeleteAndInsertTableSuite extends UpdateTableSuiteBase {

Review Comment:
   I wonder whether I should have two copies of this suite (with/without codegen).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #41300:
URL: https://github.com/apache/spark/pull/41300#issuecomment-1568843062

   @dongjoon-hyun, looks like something weird happened. I switched to `Expand` and rebased to pick up recent changes. Let's see if tests will work this time. The PR should be ready for a detailed review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #41300:
URL: https://github.com/apache/spark/pull/41300#issuecomment-1565192479

   It seems to fail to re-trigger, @aokolnychyi . Do you have a GitHub Action link on your commit?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41300:
URL: https://github.com/apache/spark/pull/41300#discussion_r1210540009


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SplitUpdateAsDeleteAndInsert.scala:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression}
+import org.apache.spark.sql.catalyst.util.truncatedString
+
+case class SplitUpdateAsDeleteAndInsert(

Review Comment:
   I'm in favor of reusing `Expand`, so that we can get the codegen and/or vectorization (if people install some spark plugins) of it for free.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #41300:
URL: https://github.com/apache/spark/pull/41300#discussion_r1204905927


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SplitUpdateAsDeleteAndInsert.scala:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression}
+import org.apache.spark.sql.catalyst.util.truncatedString
+
+case class SplitUpdateAsDeleteAndInsert(

Review Comment:
   Okay, I do think this node is a bit more efficient compared to `Expand` as it acts almost like `Project`. 
   
   - No need to call `copy()` on each row.
   - Sub expression elimination like in `ProjectExec`.
   - Deferred evaluation of input attributes unless they are needed like in `ProjectExec`.
   
   What are your thoughts, do you think it is worth having such a node? Technically, `Expand` would work too but it will be less efficient and will probably require more memory.  
   
   cc @dongjoon-hyun @viirya @cloud-fan @sunchao @huaxingao



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #41300:
URL: https://github.com/apache/spark/pull/41300#discussion_r1204826928


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SplitUpdateAsDeleteAndInsertExec.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, BindReferences, Expression, Projection, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.catalyst.util.truncatedString
+
+case class SplitUpdateAsDeleteAndInsertExec(
+    deleteOutput: Seq[Expression],
+    insertOutput: Seq[Expression],
+    output: Seq[Attribute],
+    child: SparkPlan) extends UnaryExecNode with CodegenSupport {
+
+  @transient override lazy val producedAttributes: AttributeSet = {
+    AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"SplitUpdateAsDeleteAndInsertExec${truncatedString(output, "[", ", ", "]", maxFields)}"
+  }
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    child.execute().mapPartitions(processPartition)
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = {
+    copy(child = newChild)
+  }
+
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val exprs = insertOutput ++ deleteOutput
+    val exprIds = exprs.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = exprIds.groupBy(id => id).filter(_._2.size > 1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {

Review Comment:
   Here is how a sample output looks like.
   
   ```
   Generated code:
   /* 001 */ public Object generate(Object[] references) {
   /* 002 */   return new GeneratedIteratorForCodegenStage1(references);
   /* 003 */ }
   /* 004 */
   /* 005 */ // codegenStageId=1
   /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
   /* 007 */   private Object[] references;
   /* 008 */   private scala.collection.Iterator[] inputs;
   /* 009 */   private scala.collection.Iterator inputadapter_input_0;
   /* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] project_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[6];
   /* 011 */
   /* 012 */   public GeneratedIteratorForCodegenStage1(Object[] references) {
   /* 013 */     this.references = references;
   /* 014 */   }
   /* 015 */
   /* 016 */   public void init(int index, scala.collection.Iterator[] inputs) {
   /* 017 */     partitionIndex = index;
   /* 018 */     this.inputs = inputs;
   /* 019 */     inputadapter_input_0 = inputs[0];
   /* 020 */     project_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(7, 64);
   /* 021 */     project_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_mutableStateArray_0[0], 0);
   /* 022 */     project_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(8, 64);
   /* 023 */     project_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_mutableStateArray_0[2], 0);
   /* 024 */     project_mutableStateArray_0[4] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(8, 64);
   /* 025 */     project_mutableStateArray_0[5] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(project_mutableStateArray_0[4], 0);
   /* 026 */
   /* 027 */   }
   /* 028 */
   /* 029 */   protected void processNext() throws java.io.IOException {
   /* 030 */     while ( inputadapter_input_0.hasNext()) {
   /* 031 */       InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next();
   /* 032 */
   /* 033 */       // common sub-expressions
   /* 034 */
   /* 035 */       boolean inputadapter_isNull_1 = inputadapter_row_0.isNullAt(1);
   /* 036 */       int inputadapter_value_1 = inputadapter_isNull_1 ?
   /* 037 */       -1 : (inputadapter_row_0.getInt(1));
   /* 038 */
   /* 039 */       // generate DELETE record
   /* 040 */
   /* 041 */       UTF8String inputadapter_value_3 = inputadapter_row_0.getUTF8String(3);
   /* 042 */       long inputadapter_value_4 = inputadapter_row_0.getLong(4);
   /* 043 */       int inputadapter_value_5 = inputadapter_row_0.getInt(5);
   /* 044 */       boolean inputadapter_isNull_6 = inputadapter_row_0.isNullAt(6);
   /* 045 */       InternalRow inputadapter_value_6 = inputadapter_isNull_6 ?
   /* 046 */       null : (inputadapter_row_0.getStruct(6, 0));
   /* 047 */       project_mutableStateArray_0[2].reset();
   /* 048 */
   /* 049 */       project_mutableStateArray_0[2].zeroOutNullBytes();
   /* 050 */
   /* 051 */       project_mutableStateArray_0[2].write(0, 1);
   /* 052 */
   /* 053 */       if (true) {
   /* 054 */         project_mutableStateArray_0[2].setNullAt(1);
   /* 055 */       } else {
   /* 056 */         project_mutableStateArray_0[2].write(1, -1);
   /* 057 */       }
   /* 058 */
   /* 059 */       if (true) {
   /* 060 */         project_mutableStateArray_0[2].setNullAt(2);
   /* 061 */       } else {
   /* 062 */         project_mutableStateArray_0[2].write(2, -1);
   /* 063 */       }
   /* 064 */
   /* 065 */       if (true) {
   /* 066 */         project_mutableStateArray_0[2].setNullAt(3);
   /* 067 */       } else {
   /* 068 */         project_mutableStateArray_0[2].write(3, -1);
   /* 069 */       }
   /* 070 */
   /* 071 */       if (false) {
   /* 072 */         project_mutableStateArray_0[2].setNullAt(4);
   /* 073 */       } else {
   /* 074 */         project_mutableStateArray_0[2].write(4, inputadapter_value_3);
   /* 075 */       }
   /* 076 */
   /* 077 */       if (false) {
   /* 078 */         project_mutableStateArray_0[2].setNullAt(5);
   /* 079 */       } else {
   /* 080 */         project_mutableStateArray_0[2].write(5, inputadapter_value_4);
   /* 081 */       }
   /* 082 */
   /* 083 */       if (false) {
   /* 084 */         project_mutableStateArray_0[2].setNullAt(6);
   /* 085 */       } else {
   /* 086 */         project_mutableStateArray_0[2].write(6, inputadapter_value_5);
   /* 087 */       }
   /* 088 */
   /* 089 */       if (inputadapter_isNull_6) {
   /* 090 */         project_mutableStateArray_0[2].setNullAt(7);
   /* 091 */       } else {
   /* 092 */         final InternalRow updaterows_tmpInput_0 = inputadapter_value_6;
   /* 093 */         if (updaterows_tmpInput_0 instanceof UnsafeRow) {
   /* 094 */           project_mutableStateArray_0[2].write(7, (UnsafeRow) updaterows_tmpInput_0);
   /* 095 */         } else {
   /* 096 */           // Remember the current cursor so that we can calculate how many bytes are
   /* 097 */           // written later.
   /* 098 */           final int updaterows_previousCursor_0 = project_mutableStateArray_0[2].cursor();
   /* 099 */
   /* 100 */           project_mutableStateArray_0[3].resetRowWriter();
   /* 101 */
   /* 102 */           project_mutableStateArray_0[2].setOffsetAndSizeFromPreviousCursor(7, updaterows_previousCursor_0);
   /* 103 */         }
   /* 104 */       }
   /* 105 */       append((project_mutableStateArray_0[2].getRow()));
   /* 106 */
   /* 107 */       // generate INSERT records
   /* 108 */
   /* 109 */       boolean updaterows_isNull_8 = true;
   /* 110 */       int updaterows_value_8 = -1;
   /* 111 */
   /* 112 */       if (!inputadapter_isNull_1) {
   /* 113 */         updaterows_isNull_8 = false; // resultCode could change nullability.
   /* 114 */
   /* 115 */         updaterows_value_8 = inputadapter_value_1 - 11;
   /* 116 */
   /* 117 */       }
   /* 118 */
   /* 119 */       boolean inputadapter_isNull_0 = inputadapter_row_0.isNullAt(0);
   /* 120 */       int inputadapter_value_0 = inputadapter_isNull_0 ?
   /* 121 */       -1 : (inputadapter_row_0.getInt(0));
   /* 122 */       project_mutableStateArray_0[4].reset();
   /* 123 */
   /* 124 */       project_mutableStateArray_0[4].zeroOutNullBytes();
   /* 125 */
   /* 126 */       project_mutableStateArray_0[4].write(0, 3);
   /* 127 */
   /* 128 */       if (inputadapter_isNull_0) {
   /* 129 */         project_mutableStateArray_0[4].setNullAt(1);
   /* 130 */       } else {
   /* 131 */         project_mutableStateArray_0[4].write(1, inputadapter_value_0);
   /* 132 */       }
   /* 133 */
   /* 134 */       if (updaterows_isNull_8) {
   /* 135 */         project_mutableStateArray_0[4].setNullAt(2);
   /* 136 */       } else {
   /* 137 */         project_mutableStateArray_0[4].write(2, updaterows_value_8);
   /* 138 */       }
   /* 139 */
   /* 140 */       if (updaterows_isNull_8) {
   /* 141 */         project_mutableStateArray_0[4].setNullAt(3);
   /* 142 */       } else {
   /* 143 */         project_mutableStateArray_0[4].write(3, updaterows_value_8);
   /* 144 */       }
   /* 145 */
   /* 146 */       if (true) {
   /* 147 */         project_mutableStateArray_0[4].setNullAt(4);
   /* 148 */       } else {
   /* 149 */         project_mutableStateArray_0[4].write(4, ((UTF8String)null));
   /* 150 */       }
   /* 151 */
   /* 152 */       if (true) {
   /* 153 */         project_mutableStateArray_0[4].setNullAt(5);
   /* 154 */       } else {
   /* 155 */         project_mutableStateArray_0[4].write(5, -1L);
   /* 156 */       }
   /* 157 */
   /* 158 */       if (true) {
   /* 159 */         project_mutableStateArray_0[4].setNullAt(6);
   /* 160 */       } else {
   /* 161 */         project_mutableStateArray_0[4].write(6, -1);
   /* 162 */       }
   /* 163 */
   /* 164 */       if (true) {
   /* 165 */         project_mutableStateArray_0[4].setNullAt(7);
   /* 166 */       } else {
   /* 167 */         final InternalRow wholestagecodegen_tmpInput_0 = ((InternalRow)null);
   /* 168 */         if (wholestagecodegen_tmpInput_0 instanceof UnsafeRow) {
   /* 169 */           project_mutableStateArray_0[4].write(7, (UnsafeRow) wholestagecodegen_tmpInput_0);
   /* 170 */         } else {
   /* 171 */           // Remember the current cursor so that we can calculate how many bytes are
   /* 172 */           // written later.
   /* 173 */           final int wholestagecodegen_previousCursor_0 = project_mutableStateArray_0[4].cursor();
   /* 174 */
   /* 175 */           project_mutableStateArray_0[5].resetRowWriter();
   /* 176 */
   /* 177 */           project_mutableStateArray_0[4].setOffsetAndSizeFromPreviousCursor(7, wholestagecodegen_previousCursor_0);
   /* 178 */         }
   /* 179 */       }
   /* 180 */       append((project_mutableStateArray_0[4].getRow()));
   /* 181 */       if (shouldStop()) return;
   /* 182 */     }
   /* 183 */   }
   /* 184 */
   /* 185 */ }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #41300:
URL: https://github.com/apache/spark/pull/41300#discussion_r1204874511


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SplitUpdateAsDeleteAndInsert.scala:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression}
+import org.apache.spark.sql.catalyst.util.truncatedString
+
+case class SplitUpdateAsDeleteAndInsert(

Review Comment:
   Maybe, I can use `Expand`/`ExpandExec`. Let me explore this to see if there is any performance implication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #41300:
URL: https://github.com/apache/spark/pull/41300#discussion_r1210626979


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SplitUpdateAsDeleteAndInsertExec.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, BindReferences, Expression, Projection, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.catalyst.util.truncatedString
+
+case class SplitUpdateAsDeleteAndInsertExec(
+    deleteOutput: Seq[Expression],
+    insertOutput: Seq[Expression],
+    output: Seq[Attribute],
+    child: SparkPlan) extends UnaryExecNode with CodegenSupport {
+
+  @transient override lazy val producedAttributes: AttributeSet = {
+    AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"SplitUpdateAsDeleteAndInsertExec${truncatedString(output, "[", ", ", "]", maxFields)}"
+  }
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    child.execute().mapPartitions(processPartition)
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = {
+    copy(child = newChild)
+  }
+
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val exprs = insertOutput ++ deleteOutput
+    val exprIds = exprs.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = exprIds.groupBy(id => id).filter(_._2.size > 1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {

Review Comment:
   No longer applies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #41300:
URL: https://github.com/apache/spark/pull/41300#discussion_r1204795655


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SplitUpdateAsDeleteAndInsertExec.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, BindReferences, Expression, Projection, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.catalyst.util.truncatedString
+
+case class SplitUpdateAsDeleteAndInsertExec(
+    deleteOutput: Seq[Expression],
+    insertOutput: Seq[Expression],
+    output: Seq[Attribute],
+    child: SparkPlan) extends UnaryExecNode with CodegenSupport {
+
+  @transient override lazy val producedAttributes: AttributeSet = {
+    AttributeSet(output.filterNot(attr => inputSet.contains(attr)))
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"SplitUpdateAsDeleteAndInsertExec${truncatedString(output, "[", ", ", "]", maxFields)}"
+  }
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    child.execute().mapPartitions(processPartition)
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = {
+    copy(child = newChild)
+  }
+
+  override def usedInputs: AttributeSet = {
+    // only attributes used at least twice should be evaluated before this plan,
+    // otherwise defer the evaluation until an attribute is actually used
+    val exprs = insertOutput ++ deleteOutput
+    val exprIds = exprs.flatMap(_.collect {
+      case attr: Attribute => attr.exprId
+    })
+    val usedMoreThanOnceExprIds = exprIds.groupBy(id => id).filter(_._2.size > 1).keySet
+    references.filter(attr => usedMoreThanOnceExprIds.contains(attr.exprId))
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {

Review Comment:
   On one hand, we do produce two records for each input record. On the other hand, we have separate unsafe row writer instances for deletes and inserts. Seems like it is safe not to copy?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #41300:
URL: https://github.com/apache/spark/pull/41300#discussion_r1204778595


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala:
##########
@@ -91,6 +91,33 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
     rowIdAttrs
   }
 
+  protected def deltaDeleteOutput(

Review Comment:
   Added to the parent class to reuse in MERGE later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #41300:
URL: https://github.com/apache/spark/pull/41300#discussion_r1204905927


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SplitUpdateAsDeleteAndInsert.scala:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression}
+import org.apache.spark.sql.catalyst.util.truncatedString
+
+case class SplitUpdateAsDeleteAndInsert(

Review Comment:
   Okay, I do think this node is a bit more efficient compared to `Expand` as it acts almost like `Project`. 
   
   - No need to call `copy()` on each row.
   - Sub expression elimination like in `ProjectExec`.
   - Deferred evaluation of input attributes unless they are needed like in `ProjectExec`.
   
   What are your thoughts, do you think it is worth having such a node? Technically, `Expand` would work too but it will be less efficient. 
   
   cc @dongjoon-hyun @viirya @cloud-fan @sunchao @huaxingao



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi closed pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi closed pull request #41300: [SPARK-43775][SQL] DataSource V2: Allow representing updates as deletes and inserts
URL: https://github.com/apache/spark/pull/41300


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org