You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/02/03 08:34:35 UTC

[GitHub] [iceberg] szehon-ho commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

szehon-ho commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798326375



##########
File path: core/src/main/java/org/apache/iceberg/io/PositionDeltaWriter.java
##########
@@ -41,6 +41,21 @@
    */
   void insert(T row, PartitionSpec spec, StructLike partition);
 
+  /**
+   * Inserts a new version of an existing row to the provided spec/partition.
+   * <p>
+   * This method allows writers to distinguish new and updated records. The caller must separately
+   * invoke {@link #delete(CharSequence, long, PartitionSpec, StructLike)} for the original
+   * row position that is being updated.
+   *
+   * @param row a new version of an existing row
+   * @param spec a new partition spec
+   * @param partition a new partition or null if the spec is unpartitioned
+   */
+  default void update(T row, PartitionSpec spec, StructLike partition) {

Review comment:
       What do you think to have update take arguments (CharSequence path, long pos, T row, PartitionSpec spec, StructLike partition) and internally invoke delete()?  Maybe more fitting of the name, than having the caller call delete() first and then this method. 

##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala
##########
@@ -126,6 +128,33 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
     ReplaceData(writeRelation, updatedAndRemainingRowsPlan, relation)
   }
 
+  // build a rewrite plan for sources that support row deltas
+  private def buildWriteDeltaPlan(
+      relation: DataSourceV2Relation,
+      table: RowLevelOperationTable,
+      assignments: Seq[Assignment],
+      cond: Expression): WriteDelta = {
+
+    // resolve all needed attrs (e.g. row ID and any required metadata attrs)
+    val rowAttrs = relation.output
+    val rowIdAttrs = resolveRowIdAttrs(relation, table.operation)
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, table.operation)
+
+    // construct a scan relation and include all required metadata columns
+    val readRelation = buildReadRelation(relation, table, metadataAttrs, rowIdAttrs)
+
+    // build a plan for updated records that match the cond
+    val matchedRowsPlan = Filter(cond, readRelation)
+    val updatedRowsPlan = buildUpdateProjection(matchedRowsPlan, assignments)
+    val operationType = Alias(Literal(UPDATE_OPERATION), OPERATION_COLUMN)()
+    val project = Project(operationType +: updatedRowsPlan.output, updatedRowsPlan)
+
+    // build a plan to write the row delta to the table
+    val writeRelation = relation.copy(table = table)

Review comment:
       Nit, the table argument is unnecessary here as it copies every field exactly.  (Though I guess I see this is copied pretty much verbatum from Spark)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org