You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/09/26 21:19:48 UTC

[GitHub] [spark] aokolnychyi opened a new pull request, #38005: [SPARK-40550][SQL] Handle DELETE commands for delta-based sources

aokolnychyi opened a new pull request, #38005:
URL: https://github.com/apache/spark/pull/38005

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   This WIP PR shows how the API added in PR #38004 can be implemented.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   Thes changes are needed as per SPIP SPARK-35801.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   Yes, this PR adds new DS v2 APIs.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   This PR comes with tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #38005:
URL: https://github.com/apache/spark/pull/38005#discussion_r1022088980


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala:
##########
@@ -254,6 +254,113 @@ case class ReplaceData(
   }
 }
 
+/**
+ * Writes a delta of rows to an existing table during a row-level operation.
+ *
+ * This node is constructed in rules that rewrite DELETE, UPDATE, MERGE operations for data sources
+ * that can handle deltas of rows.
+ *
+ * @param table a plan that references a row-level operation table
+ * @param condition a condition that defines matching records
+ * @param query a query with a delta of records that should written
+ * @param originalTable a plan for the original table for which the row-level command was triggered
+ * @param projections projections for row ID, row, metadata attributes
+ * @param write a logical write, if already constructed
+ */
+case class WriteDelta(
+    table: NamedRelation,
+    condition: Expression,
+    query: LogicalPlan,
+    originalTable: NamedRelation,
+    projections: WriteDeltaProjections,
+    write: Option[DeltaWrite] = None) extends RowLevelWrite {
+
+  override val isByName: Boolean = false
+  override val stringArgs: Iterator[Any] = Iterator(table, query, write)
+
+  override lazy val references: AttributeSet = query.outputSet
+
+  lazy val operation: SupportsDelta = {
+    EliminateSubqueryAliases(table) match {
+      case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) =>
+        operation.asInstanceOf[SupportsDelta]
+      case _ =>
+        throw new AnalysisException(s"Cannot retrieve row-level operation from $table")
+    }
+  }
+
+  override def outputResolved: Boolean = {
+    assert(table.resolved && query.resolved,
+      "`outputResolved` can only be called when `table` and `query` are both resolved.")
+
+    operationResolved && rowAttrsResolved && rowIdAttrsResolved && metadataAttrsResolved
+  }
+
+  private def operationResolved: Boolean = {
+    val attr = query.output.head
+    attr.name == RowDeltaUtils.OPERATION_COLUMN && attr.dataType == IntegerType && !attr.nullable
+  }
+
+  // validates row projection output is compatible with table attributes
+  private def rowAttrsResolved: Boolean = {
+    table.skipSchemaResolution || (projections.rowProjection match {

Review Comment:
   I am using projection schemas instead of `query.output` as those schemas will contain precise nullability.
   
   Once we add support for MERGE operations, there will be a node that would merge matching incoming and existing rows into one. That means one `SparkPlan` would contains deletes, updates, inserts. Metadata columns and row IDs attributes in `query` will always be nullable as those columns are null for insert rows. However, row ID and metadata are never passed to sources with insert rows in `DeltaWriter`. That's why we need to look at projections, not `query`. Projections contain the correct nullability that will be seen by sources.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a diff in pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #38005:
URL: https://github.com/apache/spark/pull/38005#discussion_r1081781360


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala:
##########
@@ -274,6 +274,120 @@ case class ReplaceData(
   }
 }
 
+/**
+ * Writes a delta of rows to an existing table during a row-level operation.
+ *
+ * This node references a query that translates a logical DELETE, UPDATE, MERGE operation into
+ * a set of row-level changes to be encoded in the table. Each row in the query represents either
+ * a delete, update or insert and stores the operation type in a special column.
+ *
+ * This node is constructed in rules that rewrite DELETE, UPDATE, MERGE operations for data sources
+ * that can handle deltas of rows.
+ *
+ * @param table a plan that references a row-level operation table
+ * @param condition a condition that defines matching records
+ * @param query a query with a delta of records that should written
+ * @param originalTable a plan for the original table for which the row-level command was triggered
+ * @param projections projections for row ID, row, metadata attributes
+ * @param write a logical write, if already constructed
+ */
+case class WriteDelta(
+    table: NamedRelation,
+    condition: Expression,
+    query: LogicalPlan,
+    originalTable: NamedRelation,
+    projections: WriteDeltaProjections,
+    write: Option[DeltaWrite] = None) extends RowLevelWrite {
+
+  override val isByName: Boolean = false
+  override val stringArgs: Iterator[Any] = Iterator(table, query, write)
+
+  override lazy val references: AttributeSet = query.outputSet
+
+  lazy val operation: SupportsDelta = {
+    EliminateSubqueryAliases(table) match {
+      case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) =>
+        operation.asInstanceOf[SupportsDelta]
+      case _ =>
+        throw new AnalysisException(s"Cannot retrieve row-level operation from $table")
+    }
+  }
+
+  override def outputResolved: Boolean = {
+    assert(table.resolved && query.resolved,
+      "`outputResolved` can only be called when `table` and `query` are both resolved.")
+
+    operationResolved && rowAttrsResolved && rowIdAttrsResolved && metadataAttrsResolved
+  }
+
+  private def operationResolved: Boolean = {
+    val attr = query.output.head
+    attr.name == RowDeltaUtils.OPERATION_COLUMN && attr.dataType == IntegerType && !attr.nullable
+  }
+
+  // validates row projection output is compatible with table attributes
+  private def rowAttrsResolved: Boolean = {
+    table.skipSchemaResolution || (projections.rowProjection match {
+      case Some(projection) =>
+        table.output.size == projection.schema.size &&
+          projection.schema.zip(table.output).forall { case (field, outAttr) =>
+            isCompatible(field, outAttr)
+          }
+      case None =>
+        true
+    })
+  }
+
+  // validates row ID projection output is compatible with row ID attributes
+  private def rowIdAttrsResolved: Boolean = {
+    val rowIdAttrs = V2ExpressionUtils.resolveRefs[AttributeReference](
+      operation.rowId,
+      originalTable)
+
+    val projectionSchema = projections.rowIdProjection.schema
+    rowIdAttrs.size == projectionSchema.size && projectionSchema.forall { field =>
+      rowIdAttrs.exists(rowIdAttr => isCompatible(field, rowIdAttr))
+    }
+  }
+
+  // validates metadata projection output is compatible with metadata attributes
+  private def metadataAttrsResolved: Boolean = {
+    projections.metadataProjection match {
+      case Some(projection) =>
+        val metadataAttrs = V2ExpressionUtils.resolveRefs[AttributeReference](
+          operation.requiredMetadataAttributes,
+          originalTable)
+
+        val projectionSchema = projection.schema
+        metadataAttrs.size == projectionSchema.size && projectionSchema.forall { field =>
+          metadataAttrs.exists(metadataAttr => isCompatible(field, metadataAttr))
+        }
+      case None =>
+        true
+    }
+  }
+
+  private def isCompatible(projectionField: StructField, outAttr: NamedExpression): Boolean = {
+    val inType = CharVarcharUtils.getRawType(projectionField.metadata).getOrElse(outAttr.dataType)

Review Comment:
   I think for `inType`, you should get datatype from `projectionField`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #38005:
URL: https://github.com/apache/spark/pull/38005#issuecomment-1314802801

   I don't think the test failure is related. Let me re-trigger.
   
   ```
   SPARK-33084: Add jar support Ivy URI in SQL *** FAILED *** 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #38005:
URL: https://github.com/apache/spark/pull/38005#discussion_r1022088980


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala:
##########
@@ -254,6 +254,113 @@ case class ReplaceData(
   }
 }
 
+/**
+ * Writes a delta of rows to an existing table during a row-level operation.
+ *
+ * This node is constructed in rules that rewrite DELETE, UPDATE, MERGE operations for data sources
+ * that can handle deltas of rows.
+ *
+ * @param table a plan that references a row-level operation table
+ * @param condition a condition that defines matching records
+ * @param query a query with a delta of records that should written
+ * @param originalTable a plan for the original table for which the row-level command was triggered
+ * @param projections projections for row ID, row, metadata attributes
+ * @param write a logical write, if already constructed
+ */
+case class WriteDelta(
+    table: NamedRelation,
+    condition: Expression,
+    query: LogicalPlan,
+    originalTable: NamedRelation,
+    projections: WriteDeltaProjections,
+    write: Option[DeltaWrite] = None) extends RowLevelWrite {
+
+  override val isByName: Boolean = false
+  override val stringArgs: Iterator[Any] = Iterator(table, query, write)
+
+  override lazy val references: AttributeSet = query.outputSet
+
+  lazy val operation: SupportsDelta = {
+    EliminateSubqueryAliases(table) match {
+      case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) =>
+        operation.asInstanceOf[SupportsDelta]
+      case _ =>
+        throw new AnalysisException(s"Cannot retrieve row-level operation from $table")
+    }
+  }
+
+  override def outputResolved: Boolean = {
+    assert(table.resolved && query.resolved,
+      "`outputResolved` can only be called when `table` and `query` are both resolved.")
+
+    operationResolved && rowAttrsResolved && rowIdAttrsResolved && metadataAttrsResolved
+  }
+
+  private def operationResolved: Boolean = {
+    val attr = query.output.head
+    attr.name == RowDeltaUtils.OPERATION_COLUMN && attr.dataType == IntegerType && !attr.nullable
+  }
+
+  // validates row projection output is compatible with table attributes
+  private def rowAttrsResolved: Boolean = {
+    table.skipSchemaResolution || (projections.rowProjection match {

Review Comment:
   I am using projection schemas instead of `query.output` as those schemas will contain precise nullability.
   
   Once we add support for MERGE operations, there will be a node that would merge matching incoming and existing rows into one. That means one `SparkPlan` would contains deletes, updates, inserts. Metadata columns and row IDs attributes in `query` will always be nullable as those columns are null for insert rows. However, row ID and metadata are never passed to sources with insert rows in `DeltaWriter`. That's why we need to look at projections, not `query`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #38005:
URL: https://github.com/apache/spark/pull/38005#issuecomment-1397327118

   I've updated this PR and its description so it is ready for another look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #38005:
URL: https://github.com/apache/spark/pull/38005#discussion_r1081825442


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala:
##########
@@ -477,6 +507,73 @@ object DataWritingSparkTask extends Logging {
   }
 }
 
+object DataWritingSparkTask extends WritingSparkTask[DataWriter[InternalRow]] {
+  override protected def write(writer: DataWriter[InternalRow], row: InternalRow): Unit = {
+    writer.write(row)
+  }
+}
+
+case class DeltaWritingSparkTask(
+    projections: WriteDeltaProjections) extends WritingSparkTask[DeltaWriter[InternalRow]] {
+
+  private lazy val rowProjection = projections.rowProjection.orNull
+  private lazy val rowIdProjection = projections.rowIdProjection
+
+  override protected def write(writer: DeltaWriter[InternalRow], row: InternalRow): Unit = {
+    val operation = row.getInt(0)
+
+    operation match {
+      case DELETE_OPERATION =>
+        rowIdProjection.project(row)
+        writer.delete(null, rowIdProjection)
+
+      case UPDATE_OPERATION =>
+        rowProjection.project(row)
+        rowIdProjection.project(row)
+        writer.update(null, rowIdProjection, rowProjection)
+
+      case INSERT_OPERATION =>
+        rowProjection.project(row)
+        writer.insert(rowProjection)

Review Comment:
   You are correct those are not used just yet but they are part of the contract for writing deltas defined by the SPIP. That's why I went ahead and added them. I can remove this logic until we have rules to handle DELETE and MERGE, if you think that would be better. Let me know.
   
   Once we add support for UPDATE and MERGE, we will have `update` and `insert` rows as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #38005:
URL: https://github.com/apache/spark/pull/38005#discussion_r1022079865


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ProjectingInternalRow.scala:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{DataType, Decimal, StructType}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+/**
+ * An [[InternalRow]] that projects particular columns from another [[InternalRow]] without copying
+ * the underlying data.
+ */
+case class ProjectingInternalRow(schema: StructType, colOrdinals: Seq[Int]) extends InternalRow {

Review Comment:
   This is a class that allows me to project row, row ID, metadata attributes from a single row without copying data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #38005:
URL: https://github.com/apache/spark/pull/38005#discussion_r1022090308


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import org.apache.spark.sql.AnalysisException
+
+class DeltaBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase {

Review Comment:
   We inherit lots of tests from the parent suite.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38005:
URL: https://github.com/apache/spark/pull/38005#discussion_r1024312221


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala:
##########
@@ -477,6 +507,73 @@ object DataWritingSparkTask extends Logging {
   }
 }
 
+object DataWritingSparkTask extends WritingSparkTask[DataWriter[InternalRow]] {
+  override protected def write(writer: DataWriter[InternalRow], row: InternalRow): Unit = {
+    writer.write(row)
+  }
+}
+
+case class DeltaWritingSparkTask(
+    projections: WriteDeltaProjections) extends WritingSparkTask[DeltaWriter[InternalRow]] {
+
+  private lazy val rowProjection = projections.rowProjection.orNull
+  private lazy val rowIdProjection = projections.rowIdProjection
+
+  override protected def write(writer: DeltaWriter[InternalRow], row: InternalRow): Unit = {
+    val operation = row.getInt(0)
+
+    operation match {
+      case DELETE_OPERATION =>
+        rowIdProjection.project(row)
+        writer.delete(null, rowIdProjection)

Review Comment:
   don't we need to pass the metadata row?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on PR #38005:
URL: https://github.com/apache/spark/pull/38005#issuecomment-1397734988

   All tests passed. Merged to master for Apache Spark 3.4.0.
   
   ![Screen Shot 2023-01-19 at 3 19 10 PM](https://user-images.githubusercontent.com/9700541/213583682-080d7e88-86f9-4e94-aa4e-ed0a71d770c3.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #38005:
URL: https://github.com/apache/spark/pull/38005#discussion_r1080854999


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala:
##########
@@ -68,4 +73,58 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
       operation.requiredMetadataAttributes,
       relation)
   }
+
+  protected def resolveRowIdAttrs(
+      relation: DataSourceV2Relation,
+      operation: RowLevelOperation): Seq[AttributeReference] = {
+
+    operation match {
+      case supportsDelta: SupportsDelta =>
+        val rowIdAttrs = V2ExpressionUtils.resolveRefs[AttributeReference](
+          supportsDelta.rowId,
+          relation)
+
+        val nullableRowIdAttrs = rowIdAttrs.filter(_.nullable)
+        if (nullableRowIdAttrs.nonEmpty) {
+          throw QueryCompilationErrors.nullableRowIdError(nullableRowIdAttrs)
+        }
+
+        rowIdAttrs
+
+      case _ =>
+        throw QueryCompilationErrors.notDeltaBasedRowLevelOperationError(operation)

Review Comment:
   Reworked this part and added a safe explicit cast.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
huaxingao commented on PR #38005:
URL: https://github.com/apache/spark/pull/38005#issuecomment-1397613359

   +1 LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #38005:
URL: https://github.com/apache/spark/pull/38005#issuecomment-1317931495

   We should probably enrich the PR description to talk about the general approach. e.g. we add a virtual column to indicate the operation (delete, update, insert)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #38005:
URL: https://github.com/apache/spark/pull/38005#issuecomment-1318009704

   @cloud-fan, sounds good. Will do by the end of this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #38005:
URL: https://github.com/apache/spark/pull/38005#issuecomment-1314444425

   cc @cloud-fan @rdblue @amaliujia @huaxingao @dongjoon-hyun @sunchao @viirya
   
   This is an implementation of PR #38004. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun closed pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun closed pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources
URL: https://github.com/apache/spark/pull/38005


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a diff in pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
viirya commented on code in PR #38005:
URL: https://github.com/apache/spark/pull/38005#discussion_r1081796690


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala:
##########
@@ -477,6 +507,73 @@ object DataWritingSparkTask extends Logging {
   }
 }
 
+object DataWritingSparkTask extends WritingSparkTask[DataWriter[InternalRow]] {
+  override protected def write(writer: DataWriter[InternalRow], row: InternalRow): Unit = {
+    writer.write(row)
+  }
+}
+
+case class DeltaWritingSparkTask(
+    projections: WriteDeltaProjections) extends WritingSparkTask[DeltaWriter[InternalRow]] {
+
+  private lazy val rowProjection = projections.rowProjection.orNull
+  private lazy val rowIdProjection = projections.rowIdProjection
+
+  override protected def write(writer: DeltaWriter[InternalRow], row: InternalRow): Unit = {
+    val operation = row.getInt(0)
+
+    operation match {
+      case DELETE_OPERATION =>
+        rowIdProjection.project(row)
+        writer.delete(null, rowIdProjection)
+
+      case UPDATE_OPERATION =>
+        rowProjection.project(row)
+        rowIdProjection.project(row)
+        writer.update(null, rowIdProjection, rowProjection)
+
+      case INSERT_OPERATION =>
+        rowProjection.project(row)
+        writer.insert(rowProjection)

Review Comment:
   I can see `DELETE_OPERATION` is used in `buildWriteDeltaPlan`. 
   
   `UPDATE_OPERATION` and `INSERT_OPERATION` are defined together in this PR too. But seems I don't find where they are used in query plan?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #38005:
URL: https://github.com/apache/spark/pull/38005#discussion_r1081828234


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala:
##########
@@ -274,6 +274,120 @@ case class ReplaceData(
   }
 }
 
+/**
+ * Writes a delta of rows to an existing table during a row-level operation.
+ *
+ * This node references a query that translates a logical DELETE, UPDATE, MERGE operation into
+ * a set of row-level changes to be encoded in the table. Each row in the query represents either
+ * a delete, update or insert and stores the operation type in a special column.
+ *
+ * This node is constructed in rules that rewrite DELETE, UPDATE, MERGE operations for data sources
+ * that can handle deltas of rows.
+ *
+ * @param table a plan that references a row-level operation table
+ * @param condition a condition that defines matching records
+ * @param query a query with a delta of records that should written
+ * @param originalTable a plan for the original table for which the row-level command was triggered
+ * @param projections projections for row ID, row, metadata attributes
+ * @param write a logical write, if already constructed
+ */
+case class WriteDelta(
+    table: NamedRelation,
+    condition: Expression,
+    query: LogicalPlan,
+    originalTable: NamedRelation,
+    projections: WriteDeltaProjections,
+    write: Option[DeltaWrite] = None) extends RowLevelWrite {
+
+  override val isByName: Boolean = false
+  override val stringArgs: Iterator[Any] = Iterator(table, query, write)
+
+  override lazy val references: AttributeSet = query.outputSet
+
+  lazy val operation: SupportsDelta = {
+    EliminateSubqueryAliases(table) match {
+      case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) =>
+        operation.asInstanceOf[SupportsDelta]
+      case _ =>
+        throw new AnalysisException(s"Cannot retrieve row-level operation from $table")
+    }
+  }
+
+  override def outputResolved: Boolean = {
+    assert(table.resolved && query.resolved,
+      "`outputResolved` can only be called when `table` and `query` are both resolved.")
+
+    operationResolved && rowAttrsResolved && rowIdAttrsResolved && metadataAttrsResolved
+  }
+
+  private def operationResolved: Boolean = {
+    val attr = query.output.head
+    attr.name == RowDeltaUtils.OPERATION_COLUMN && attr.dataType == IntegerType && !attr.nullable
+  }
+
+  // validates row projection output is compatible with table attributes
+  private def rowAttrsResolved: Boolean = {
+    table.skipSchemaResolution || (projections.rowProjection match {
+      case Some(projection) =>
+        table.output.size == projection.schema.size &&
+          projection.schema.zip(table.output).forall { case (field, outAttr) =>
+            isCompatible(field, outAttr)
+          }
+      case None =>
+        true
+    })
+  }
+
+  // validates row ID projection output is compatible with row ID attributes
+  private def rowIdAttrsResolved: Boolean = {
+    val rowIdAttrs = V2ExpressionUtils.resolveRefs[AttributeReference](
+      operation.rowId,
+      originalTable)
+
+    val projectionSchema = projections.rowIdProjection.schema
+    rowIdAttrs.size == projectionSchema.size && projectionSchema.forall { field =>
+      rowIdAttrs.exists(rowIdAttr => isCompatible(field, rowIdAttr))
+    }
+  }
+
+  // validates metadata projection output is compatible with metadata attributes
+  private def metadataAttrsResolved: Boolean = {
+    projections.metadataProjection match {
+      case Some(projection) =>
+        val metadataAttrs = V2ExpressionUtils.resolveRefs[AttributeReference](
+          operation.requiredMetadataAttributes,
+          originalTable)
+
+        val projectionSchema = projection.schema
+        metadataAttrs.size == projectionSchema.size && projectionSchema.forall { field =>
+          metadataAttrs.exists(metadataAttr => isCompatible(field, metadataAttr))
+        }
+      case None =>
+        true
+    }
+  }
+
+  private def isCompatible(projectionField: StructField, outAttr: NamedExpression): Boolean = {
+    val inType = CharVarcharUtils.getRawType(projectionField.metadata).getOrElse(outAttr.dataType)

Review Comment:
   Good catch! Let me fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #38005:
URL: https://github.com/apache/spark/pull/38005#discussion_r1081850206


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala:
##########
@@ -274,6 +274,120 @@ case class ReplaceData(
   }
 }
 
+/**
+ * Writes a delta of rows to an existing table during a row-level operation.
+ *
+ * This node references a query that translates a logical DELETE, UPDATE, MERGE operation into
+ * a set of row-level changes to be encoded in the table. Each row in the query represents either
+ * a delete, update or insert and stores the operation type in a special column.
+ *
+ * This node is constructed in rules that rewrite DELETE, UPDATE, MERGE operations for data sources
+ * that can handle deltas of rows.
+ *
+ * @param table a plan that references a row-level operation table
+ * @param condition a condition that defines matching records
+ * @param query a query with a delta of records that should written
+ * @param originalTable a plan for the original table for which the row-level command was triggered
+ * @param projections projections for row ID, row, metadata attributes
+ * @param write a logical write, if already constructed
+ */
+case class WriteDelta(
+    table: NamedRelation,
+    condition: Expression,
+    query: LogicalPlan,
+    originalTable: NamedRelation,
+    projections: WriteDeltaProjections,
+    write: Option[DeltaWrite] = None) extends RowLevelWrite {
+
+  override val isByName: Boolean = false
+  override val stringArgs: Iterator[Any] = Iterator(table, query, write)
+
+  override lazy val references: AttributeSet = query.outputSet
+
+  lazy val operation: SupportsDelta = {
+    EliminateSubqueryAliases(table) match {
+      case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) =>
+        operation.asInstanceOf[SupportsDelta]
+      case _ =>
+        throw new AnalysisException(s"Cannot retrieve row-level operation from $table")
+    }
+  }
+
+  override def outputResolved: Boolean = {
+    assert(table.resolved && query.resolved,
+      "`outputResolved` can only be called when `table` and `query` are both resolved.")
+
+    operationResolved && rowAttrsResolved && rowIdAttrsResolved && metadataAttrsResolved
+  }
+
+  private def operationResolved: Boolean = {
+    val attr = query.output.head
+    attr.name == RowDeltaUtils.OPERATION_COLUMN && attr.dataType == IntegerType && !attr.nullable
+  }
+
+  // validates row projection output is compatible with table attributes
+  private def rowAttrsResolved: Boolean = {
+    table.skipSchemaResolution || (projections.rowProjection match {
+      case Some(projection) =>
+        table.output.size == projection.schema.size &&
+          projection.schema.zip(table.output).forall { case (field, outAttr) =>
+            isCompatible(field, outAttr)
+          }
+      case None =>
+        true
+    })
+  }
+
+  // validates row ID projection output is compatible with row ID attributes
+  private def rowIdAttrsResolved: Boolean = {
+    val rowIdAttrs = V2ExpressionUtils.resolveRefs[AttributeReference](
+      operation.rowId,
+      originalTable)
+
+    val projectionSchema = projections.rowIdProjection.schema
+    rowIdAttrs.size == projectionSchema.size && projectionSchema.forall { field =>
+      rowIdAttrs.exists(rowIdAttr => isCompatible(field, rowIdAttr))
+    }
+  }
+
+  // validates metadata projection output is compatible with metadata attributes
+  private def metadataAttrsResolved: Boolean = {
+    projections.metadataProjection match {
+      case Some(projection) =>
+        val metadataAttrs = V2ExpressionUtils.resolveRefs[AttributeReference](
+          operation.requiredMetadataAttributes,
+          originalTable)
+
+        val projectionSchema = projection.schema
+        metadataAttrs.size == projectionSchema.size && projectionSchema.forall { field =>
+          metadataAttrs.exists(metadataAttr => isCompatible(field, metadataAttr))
+        }
+      case None =>
+        true
+    }
+  }
+
+  private def isCompatible(projectionField: StructField, outAttr: NamedExpression): Boolean = {
+    val inType = CharVarcharUtils.getRawType(projectionField.metadata).getOrElse(outAttr.dataType)

Review Comment:
   Fixed, resolving this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38005:
URL: https://github.com/apache/spark/pull/38005#discussion_r1024306166


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala:
##########
@@ -68,4 +73,58 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
       operation.requiredMetadataAttributes,
       relation)
   }
+
+  protected def resolveRowIdAttrs(
+      relation: DataSourceV2Relation,
+      operation: RowLevelOperation): Seq[AttributeReference] = {
+
+    operation match {
+      case supportsDelta: SupportsDelta =>
+        val rowIdAttrs = V2ExpressionUtils.resolveRefs[AttributeReference](
+          supportsDelta.rowId,
+          relation)
+
+        val nullableRowIdAttrs = rowIdAttrs.filter(_.nullable)
+        if (nullableRowIdAttrs.nonEmpty) {
+          throw QueryCompilationErrors.nullableRowIdError(nullableRowIdAttrs)
+        }
+
+        rowIdAttrs
+
+      case _ =>
+        throw QueryCompilationErrors.notDeltaBasedRowLevelOperationError(operation)

Review Comment:
   this should never happen, we can throw `SparkException.internalError` directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #38005:
URL: https://github.com/apache/spark/pull/38005#discussion_r1022079865


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ProjectingInternalRow.scala:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{DataType, Decimal, StructType}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+/**
+ * An [[InternalRow]] that projects particular columns from another [[InternalRow]] without copying
+ * the underlying data.
+ */
+case class ProjectingInternalRow(schema: StructType, colOrdinals: Seq[Int]) extends InternalRow {

Review Comment:
   This is a class that allows me to project row, row ID, metadata attributes from a single row without copying.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] aokolnychyi commented on a diff in pull request #38005: [SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based sources

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #38005:
URL: https://github.com/apache/spark/pull/38005#discussion_r1022077282


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala:
##########
@@ -477,6 +507,73 @@ object DataWritingSparkTask extends Logging {
   }
 }
 
+object DataWritingSparkTask extends WritingSparkTask[DataWriter[InternalRow]] {
+  override protected def write(writer: DataWriter[InternalRow], row: InternalRow): Unit = {
+    writer.write(row)
+  }
+}
+
+case class DeltaWritingSparkTask(
+    projections: WriteDeltaProjections) extends WritingSparkTask[DeltaWriter[InternalRow]] {
+
+  private lazy val rowProjection = projections.rowProjection.orNull

Review Comment:
   I am reusing the same `ProjectingInternalRow` instance for all rows to avoid extra copies. Without reusing, the performance will be fairly poor. My assumption is that sources don't usually accumulate incoming records before passing to the writer and if they do, they can call copy.
   
   Any feedback on this point is highly appreciated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org