You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/01/25 22:06:29 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

aokolnychyi opened a new pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984


   This PR implements merge-on-read UPDATE in Spark 3.2.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r799704230



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
##########
@@ -407,6 +428,216 @@ public void close() throws IOException {
     }
   }
 
+  private static class UnpartitionedDeltaWriter extends BaseDeltaWriter {
+    private final PositionDeltaWriter<InternalRow> delegate;
+    private final FileIO io;
+    private final PartitionSpec dataSpec;
+    private final Map<Integer, PartitionSpec> specs;
+    private final InternalRowWrapper deletePartitionRowWrapper;
+    private final Map<Integer, StructProjection> deletePartitionProjections;
+    private final int specIdOrdinal;
+    private final int partitionOrdinal;
+    private final int fileOrdinal;
+    private final int positionOrdinal;
+
+    private boolean closed = false;
+
+    UnpartitionedDeltaWriter(Table table, SparkFileWriterFactory writerFactory,
+                             OutputFileFactory dataFileFactory, OutputFileFactory deleteFileFactory,
+                             Context context) {
+
+      Preconditions.checkArgument(table.spec().isUnpartitioned(), "Data spec must be unpartitioned");
+
+      ClusteredDataWriter<InternalRow> insertWriter = new ClusteredDataWriter<>(
+          writerFactory, dataFileFactory, table.io(),
+          context.dataFileFormat(), context.targetDataFileSize());
+
+      ClusteredDataWriter<InternalRow> updateWriter = new ClusteredDataWriter<>(
+          writerFactory, dataFileFactory, table.io(),
+          context.dataFileFormat(), context.targetDataFileSize());
+
+      ClusteredPositionDeleteWriter<InternalRow> deleteWriter = new ClusteredPositionDeleteWriter<>(
+          writerFactory, deleteFileFactory, table.io(),
+          context.deleteFileFormat(), context.targetDeleteFileSize());
+
+      this.delegate = new BasePositionDeltaWriter<>(insertWriter, updateWriter, deleteWriter);
+      this.io = table.io();
+      this.dataSpec = table.spec();
+      this.specs = table.specs();
+
+      Types.StructType partitionType = Partitioning.partitionType(table);
+      this.deletePartitionRowWrapper = initPartitionRowWrapper(partitionType);
+      this.deletePartitionProjections = buildPartitionProjections(partitionType, specs);
+
+      this.specIdOrdinal = context.metadataSparkType().fieldIndex(MetadataColumns.SPEC_ID.name());
+      this.partitionOrdinal = context.metadataSparkType().fieldIndex(MetadataColumns.PARTITION_COLUMN_NAME);
+      this.fileOrdinal = context.deleteSparkType().fieldIndex(MetadataColumns.FILE_PATH.name());
+      this.positionOrdinal = context.deleteSparkType().fieldIndex(MetadataColumns.ROW_POSITION.name());
+    }
+
+    @Override
+    public void delete(InternalRow meta, InternalRow id) throws IOException {
+      int specId = meta.getInt(specIdOrdinal);
+      PartitionSpec spec = specs.get(specId);
+
+      InternalRow partition = meta.getStruct(partitionOrdinal, deletePartitionRowWrapper.size());
+      StructProjection partitionProjection = deletePartitionProjections.get(specId);
+      partitionProjection.wrap(deletePartitionRowWrapper.wrap(partition));
+
+      String file = id.getString(fileOrdinal);
+      long position = id.getLong(positionOrdinal);
+      delegate.delete(file, position, spec, partitionProjection);
+    }
+
+    @Override
+    public void update(InternalRow meta, InternalRow id, InternalRow row) throws IOException {
+      delete(meta, id);
+      delegate.update(row, dataSpec, null);
+    }
+
+    @Override
+    public void insert(InternalRow row) throws IOException {
+      delegate.insert(row, dataSpec, null);
+    }
+
+    @Override
+    public WriterCommitMessage commit() throws IOException {
+      close();
+
+      WriteResult result = delegate.result();
+      return new DeltaTaskCommit(result);
+    }
+
+    @Override
+    public void abort() throws IOException {
+      close();
+
+      WriteResult result = delegate.result();
+      cleanFiles(io, Arrays.asList(result.dataFiles()));
+      cleanFiles(io, Arrays.asList(result.deleteFiles()));
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (!closed) {
+        delegate.close();
+        this.closed = true;
+      }
+    }
+  }
+
+  private static class PartitionedDeltaWriter extends BaseDeltaWriter {

Review comment:
       Just one or two extra calls for every row. I can implement the accessors in a way that should allow inlining but we can never guarantee it will be actually inlined.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r799872258



##########
File path: core/src/main/java/org/apache/iceberg/io/PositionDeltaWriter.java
##########
@@ -41,6 +41,21 @@
    */
   void insert(T row, PartitionSpec spec, StructLike partition);
 
+  /**
+   * Inserts a new version of an existing row to the provided spec/partition.
+   * <p>
+   * This method allows writers to distinguish new and updated records. The caller must separately
+   * invoke {@link #delete(CharSequence, long, PartitionSpec, StructLike)} for the original
+   * row position that is being updated.
+   *
+   * @param row a new version of an existing row
+   * @param spec a new partition spec
+   * @param partition a new partition or null if the spec is unpartitioned
+   */
+  default void update(T row, PartitionSpec spec, StructLike partition) {

Review comment:
       Yea I see, that is unfortunately a bit of arguments, though still less than max limit of checkstyle.  Yea in my opinion, one single method is cleaner than having the client remembering to call this one after delete(), but it is definitely a style choice and also not sure if anyone else agrees (@rdblue , @RussellSpitzer @jackye1995 ).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r799690563



##########
File path: core/src/main/java/org/apache/iceberg/io/PositionDeltaWriter.java
##########
@@ -41,6 +41,21 @@
    */
   void insert(T row, PartitionSpec spec, StructLike partition);
 
+  /**
+   * Inserts a new version of an existing row to the provided spec/partition.
+   * <p>
+   * This method allows writers to distinguish new and updated records. The caller must separately
+   * invoke {@link #delete(CharSequence, long, PartitionSpec, StructLike)} for the original
+   * row position that is being updated.
+   *
+   * @param row a new version of an existing row
+   * @param spec a new partition spec
+   * @param partition a new partition or null if the spec is unpartitioned
+   */
+  default void update(T row, PartitionSpec spec, StructLike partition) {

Review comment:
       @szehon-ho, I thought about it but the original record may be in one spec/partition and the new record may be written to another one. As a result, we would need to have path, pos, row, oldSpec, oldPartition, newSpec, newPartition, which makes it a rather long list of arguments. I am happy to reconsider if folks prefer that API instead of this one.
   
   @rdblue, correct, your last comment is spot on.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798138304



##########
File path: core/src/main/java/org/apache/iceberg/io/PositionDeltaWriter.java
##########
@@ -41,6 +41,21 @@
    */
   void insert(T row, PartitionSpec spec, StructLike partition);
 
+  /**
+   * Inserts a new version of an existing row to the provided spec/partition.
+   * <p>
+   * This method allows writers to distinguish new and updated records. The caller must separately
+   * invoke {@link #delete(CharSequence, long, PartitionSpec, StructLike)} for the original
+   * row position that is being updated.
+   *
+   * @param row a new version of an existing row
+   * @param spec a new partition spec
+   * @param partition a new partition or null if the spec is unpartitioned
+   */
+  default void update(T row, PartitionSpec spec, StructLike partition) {

Review comment:
       If I understand correctly, the problem that this fixes is that the inserts are expected to be clustered, but we don't have the same expectation for updates because it's hard to come up with a single sort order that works for all cases (inserts, updates, and deletes).
   
   So we separate updates that are not expected to be clustered and use a fanout writer for them?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798745128



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala
##########
@@ -126,6 +128,33 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
     ReplaceData(writeRelation, updatedAndRemainingRowsPlan, relation)
   }
 
+  // build a rewrite plan for sources that support row deltas
+  private def buildWriteDeltaPlan(
+      relation: DataSourceV2Relation,
+      table: RowLevelOperationTable,
+      assignments: Seq[Assignment],
+      cond: Expression): WriteDelta = {
+
+    // resolve all needed attrs (e.g. row ID and any required metadata attrs)
+    val rowAttrs = relation.output
+    val rowIdAttrs = resolveRowIdAttrs(relation, table.operation)
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, table.operation)
+
+    // construct a scan relation and include all required metadata columns
+    val readRelation = buildReadRelation(relation, table, metadataAttrs, rowIdAttrs)
+
+    // build a plan for updated records that match the cond
+    val matchedRowsPlan = Filter(cond, readRelation)
+    val updatedRowsPlan = buildUpdateProjection(matchedRowsPlan, assignments)
+    val operationType = Alias(Literal(UPDATE_OPERATION), OPERATION_COLUMN)()
+    val project = Project(operationType +: updatedRowsPlan.output, updatedRowsPlan)
+
+    // build a plan to write the row delta to the table
+    val writeRelation = relation.copy(table = table)

Review comment:
       This caught me the first time I saw it, too. What's happening here is that we're replacing the original table with the `RowLevelOperationTable` that wraps the `RowLevelOperation`. The reason is so that scans produced by the table are coordinated with writes done in the table. We use the table interface to avoid needing to handle the operation separately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r792166036



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
##########
@@ -198,7 +198,7 @@ private DistributionMode adjustWriteDistributionMode(DistributionMode mode) {
     }
   }
 
-  public DistributionMode copyOnWriteDeleteDistributionMode() {

Review comment:
       Given the recent changes, I am no longer sure we need separate methods for copy-on-write and merge-on-read.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r799702347



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
##########
@@ -407,6 +428,216 @@ public void close() throws IOException {
     }
   }
 
+  private static class UnpartitionedDeltaWriter extends BaseDeltaWriter {
+    private final PositionDeltaWriter<InternalRow> delegate;
+    private final FileIO io;
+    private final PartitionSpec dataSpec;
+    private final Map<Integer, PartitionSpec> specs;
+    private final InternalRowWrapper deletePartitionRowWrapper;
+    private final Map<Integer, StructProjection> deletePartitionProjections;
+    private final int specIdOrdinal;
+    private final int partitionOrdinal;
+    private final int fileOrdinal;
+    private final int positionOrdinal;
+
+    private boolean closed = false;
+
+    UnpartitionedDeltaWriter(Table table, SparkFileWriterFactory writerFactory,
+                             OutputFileFactory dataFileFactory, OutputFileFactory deleteFileFactory,
+                             Context context) {
+
+      Preconditions.checkArgument(table.spec().isUnpartitioned(), "Data spec must be unpartitioned");
+
+      ClusteredDataWriter<InternalRow> insertWriter = new ClusteredDataWriter<>(
+          writerFactory, dataFileFactory, table.io(),
+          context.dataFileFormat(), context.targetDataFileSize());
+
+      ClusteredDataWriter<InternalRow> updateWriter = new ClusteredDataWriter<>(
+          writerFactory, dataFileFactory, table.io(),
+          context.dataFileFormat(), context.targetDataFileSize());
+
+      ClusteredPositionDeleteWriter<InternalRow> deleteWriter = new ClusteredPositionDeleteWriter<>(
+          writerFactory, deleteFileFactory, table.io(),
+          context.deleteFileFormat(), context.targetDeleteFileSize());
+
+      this.delegate = new BasePositionDeltaWriter<>(insertWriter, updateWriter, deleteWriter);
+      this.io = table.io();
+      this.dataSpec = table.spec();
+      this.specs = table.specs();
+
+      Types.StructType partitionType = Partitioning.partitionType(table);
+      this.deletePartitionRowWrapper = initPartitionRowWrapper(partitionType);
+      this.deletePartitionProjections = buildPartitionProjections(partitionType, specs);
+
+      this.specIdOrdinal = context.metadataSparkType().fieldIndex(MetadataColumns.SPEC_ID.name());
+      this.partitionOrdinal = context.metadataSparkType().fieldIndex(MetadataColumns.PARTITION_COLUMN_NAME);
+      this.fileOrdinal = context.deleteSparkType().fieldIndex(MetadataColumns.FILE_PATH.name());
+      this.positionOrdinal = context.deleteSparkType().fieldIndex(MetadataColumns.ROW_POSITION.name());
+    }
+
+    @Override
+    public void delete(InternalRow meta, InternalRow id) throws IOException {
+      int specId = meta.getInt(specIdOrdinal);
+      PartitionSpec spec = specs.get(specId);
+
+      InternalRow partition = meta.getStruct(partitionOrdinal, deletePartitionRowWrapper.size());
+      StructProjection partitionProjection = deletePartitionProjections.get(specId);
+      partitionProjection.wrap(deletePartitionRowWrapper.wrap(partition));
+
+      String file = id.getString(fileOrdinal);
+      long position = id.getLong(positionOrdinal);
+      delegate.delete(file, position, spec, partitionProjection);
+    }
+
+    @Override
+    public void update(InternalRow meta, InternalRow id, InternalRow row) throws IOException {
+      delete(meta, id);
+      delegate.update(row, dataSpec, null);
+    }
+
+    @Override
+    public void insert(InternalRow row) throws IOException {
+      delegate.insert(row, dataSpec, null);
+    }
+
+    @Override
+    public WriterCommitMessage commit() throws IOException {
+      close();
+
+      WriteResult result = delegate.result();
+      return new DeltaTaskCommit(result);
+    }
+
+    @Override
+    public void abort() throws IOException {
+      close();
+
+      WriteResult result = delegate.result();
+      cleanFiles(io, Arrays.asList(result.dataFiles()));
+      cleanFiles(io, Arrays.asList(result.deleteFiles()));
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (!closed) {
+        delegate.close();
+        this.closed = true;
+      }
+    }
+  }
+
+  private static class PartitionedDeltaWriter extends BaseDeltaWriter {
+    private final PositionDeltaWriter<InternalRow> delegate;
+    private final FileIO io;
+    private final PartitionSpec dataSpec;
+    private final Map<Integer, PartitionSpec> specs;
+    private final InternalRowWrapper deletePartitionRowWrapper;
+    private final Map<Integer, StructProjection> deletePartitionProjections;
+    private final PartitionKey dataPartitionKey;
+    private final InternalRowWrapper internalRowDataWrapper;
+    private final int specIdOrdinal;
+    private final int partitionOrdinal;
+    private final int fileOrdinal;
+    private final int positionOrdinal;
+
+    private boolean closed = false;
+
+    PartitionedDeltaWriter(Table table, SparkFileWriterFactory writerFactory,
+                           OutputFileFactory dataFileFactory, OutputFileFactory deleteFileFactory,
+                           Context context) {
+
+      Preconditions.checkArgument(table.spec().isPartitioned(), "Data spec must be partitioned");
+
+      PartitioningWriter<InternalRow, DataWriteResult> insertWriter;
+      if (context.fanoutWriterEnabled()) {
+        insertWriter = new FanoutDataWriter<>(
+            writerFactory, dataFileFactory, table.io(),
+            context.dataFileFormat(), context.targetDataFileSize());
+      } else {
+        insertWriter = new ClusteredDataWriter<>(
+            writerFactory, dataFileFactory, table.io(),
+            context.dataFileFormat(), context.targetDataFileSize());
+      }
+
+      // always use a separate fanout data writer for updates as they may be out of order

Review comment:
       Yeah, happens when we update a column that is used in the spec and the updated record moves to a different partition.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r799700542



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
##########
@@ -407,6 +428,216 @@ public void close() throws IOException {
     }
   }
 
+  private static class UnpartitionedDeltaWriter extends BaseDeltaWriter {
+    private final PositionDeltaWriter<InternalRow> delegate;
+    private final FileIO io;
+    private final PartitionSpec dataSpec;
+    private final Map<Integer, PartitionSpec> specs;

Review comment:
       Correct, we need `specs` for delete projections and we need `dataSpec` to attach a correct spec ID to the produced data file (since we did not reserve a particular spec ID for unpartitioned specs).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r792159480



##########
File path: core/src/main/java/org/apache/iceberg/io/BasePositionDeltaWriter.java
##########
@@ -27,25 +27,35 @@
 
 public class BasePositionDeltaWriter<T> implements PositionDeltaWriter<T> {
 
-  private final PartitioningWriter<T, DataWriteResult> dataWriter;
+  private final PartitioningWriter<T, DataWriteResult> insertWriter;

Review comment:
       Let's discuss the interface change instead (below).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798326375



##########
File path: core/src/main/java/org/apache/iceberg/io/PositionDeltaWriter.java
##########
@@ -41,6 +41,21 @@
    */
   void insert(T row, PartitionSpec spec, StructLike partition);
 
+  /**
+   * Inserts a new version of an existing row to the provided spec/partition.
+   * <p>
+   * This method allows writers to distinguish new and updated records. The caller must separately
+   * invoke {@link #delete(CharSequence, long, PartitionSpec, StructLike)} for the original
+   * row position that is being updated.
+   *
+   * @param row a new version of an existing row
+   * @param spec a new partition spec
+   * @param partition a new partition or null if the spec is unpartitioned
+   */
+  default void update(T row, PartitionSpec spec, StructLike partition) {

Review comment:
       What do you think to have update take arguments (CharSequence path, long pos, T row, PartitionSpec spec, StructLike partition) and internally invoke delete()?  Maybe more fitting of the name, than having the caller call delete() first and then this method. 

##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala
##########
@@ -126,6 +128,33 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
     ReplaceData(writeRelation, updatedAndRemainingRowsPlan, relation)
   }
 
+  // build a rewrite plan for sources that support row deltas
+  private def buildWriteDeltaPlan(
+      relation: DataSourceV2Relation,
+      table: RowLevelOperationTable,
+      assignments: Seq[Assignment],
+      cond: Expression): WriteDelta = {
+
+    // resolve all needed attrs (e.g. row ID and any required metadata attrs)
+    val rowAttrs = relation.output
+    val rowIdAttrs = resolveRowIdAttrs(relation, table.operation)
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, table.operation)
+
+    // construct a scan relation and include all required metadata columns
+    val readRelation = buildReadRelation(relation, table, metadataAttrs, rowIdAttrs)
+
+    // build a plan for updated records that match the cond
+    val matchedRowsPlan = Filter(cond, readRelation)
+    val updatedRowsPlan = buildUpdateProjection(matchedRowsPlan, assignments)
+    val operationType = Alias(Literal(UPDATE_OPERATION), OPERATION_COLUMN)()
+    val project = Project(operationType +: updatedRowsPlan.output, updatedRowsPlan)
+
+    // build a plan to write the row delta to the table
+    val writeRelation = relation.copy(table = table)

Review comment:
       Nit, the table argument is unnecessary here as it copies every field exactly.  (Though I guess I see this is copied pretty much verbatum from Spark)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798156043



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala
##########
@@ -126,6 +128,33 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
     ReplaceData(writeRelation, updatedAndRemainingRowsPlan, relation)
   }
 
+  // build a rewrite plan for sources that support row deltas
+  private def buildWriteDeltaPlan(
+      relation: DataSourceV2Relation,
+      table: RowLevelOperationTable,
+      assignments: Seq[Assignment],
+      cond: Expression): WriteDelta = {
+
+    // resolve all needed attrs (e.g. row ID and any required metadata attrs)
+    val rowAttrs = relation.output
+    val rowIdAttrs = resolveRowIdAttrs(relation, table.operation)
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, table.operation)
+
+    // construct a scan relation and include all required metadata columns
+    val readRelation = buildReadRelation(relation, table, metadataAttrs, rowIdAttrs)
+
+    // build a plan for updated records that match the cond
+    val matchedRowsPlan = Filter(cond, readRelation)
+    val updatedRowsPlan = buildUpdateProjection(matchedRowsPlan, assignments)
+    val operationType = Alias(Literal(UPDATE_OPERATION), OPERATION_COLUMN)()
+    val project = Project(operationType +: updatedRowsPlan.output, updatedRowsPlan)
+
+    // build a plan to write the row delta to the table
+    val writeRelation = relation.copy(table = table)

Review comment:
       Nit, the table argument seems unnecessary.  (Though I guess I see this is copied pretty much verbatum from Spark)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r799693682



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDistributionAndOrderingUtil.java
##########
@@ -146,14 +146,14 @@ private static Distribution buildCopyOnWriteDeleteUpdateDistribution(Table table
 
   public static Distribution buildPositionDeltaDistribution(Table table, Command command,
                                                             DistributionMode distributionMode) {
-    if (command == DELETE) {
-      return positionDeleteDistribution(distributionMode);
+    if (command == DELETE || command == UPDATE) {
+      return buildPositionDeleteUpdateDistribution(distributionMode);
     } else {
       throw new IllegalArgumentException("Only position deletes are currently supported");

Review comment:
       Good catch, let me fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r799881409



##########
File path: core/src/main/java/org/apache/iceberg/io/PositionDeltaWriter.java
##########
@@ -41,6 +41,21 @@
    */
   void insert(T row, PartitionSpec spec, StructLike partition);
 
+  /**
+   * Inserts a new version of an existing row to the provided spec/partition.
+   * <p>
+   * This method allows writers to distinguish new and updated records. The caller must separately
+   * invoke {@link #delete(CharSequence, long, PartitionSpec, StructLike)} for the original
+   * row position that is being updated.
+   *
+   * @param row a new version of an existing row
+   * @param spec a new partition spec
+   * @param partition a new partition or null if the spec is unpartitioned
+   */
+  default void update(T row, PartitionSpec spec, StructLike partition) {

Review comment:
       Sure no problem, don't let it block 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798763299



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
##########
@@ -407,6 +428,216 @@ public void close() throws IOException {
     }
   }
 
+  private static class UnpartitionedDeltaWriter extends BaseDeltaWriter {
+    private final PositionDeltaWriter<InternalRow> delegate;
+    private final FileIO io;
+    private final PartitionSpec dataSpec;
+    private final Map<Integer, PartitionSpec> specs;
+    private final InternalRowWrapper deletePartitionRowWrapper;
+    private final Map<Integer, StructProjection> deletePartitionProjections;
+    private final int specIdOrdinal;
+    private final int partitionOrdinal;
+    private final int fileOrdinal;
+    private final int positionOrdinal;
+
+    private boolean closed = false;
+
+    UnpartitionedDeltaWriter(Table table, SparkFileWriterFactory writerFactory,
+                             OutputFileFactory dataFileFactory, OutputFileFactory deleteFileFactory,
+                             Context context) {
+
+      Preconditions.checkArgument(table.spec().isUnpartitioned(), "Data spec must be unpartitioned");
+
+      ClusteredDataWriter<InternalRow> insertWriter = new ClusteredDataWriter<>(
+          writerFactory, dataFileFactory, table.io(),
+          context.dataFileFormat(), context.targetDataFileSize());
+
+      ClusteredDataWriter<InternalRow> updateWriter = new ClusteredDataWriter<>(
+          writerFactory, dataFileFactory, table.io(),
+          context.dataFileFormat(), context.targetDataFileSize());
+
+      ClusteredPositionDeleteWriter<InternalRow> deleteWriter = new ClusteredPositionDeleteWriter<>(
+          writerFactory, deleteFileFactory, table.io(),
+          context.deleteFileFormat(), context.targetDeleteFileSize());
+
+      this.delegate = new BasePositionDeltaWriter<>(insertWriter, updateWriter, deleteWriter);
+      this.io = table.io();
+      this.dataSpec = table.spec();
+      this.specs = table.specs();
+
+      Types.StructType partitionType = Partitioning.partitionType(table);
+      this.deletePartitionRowWrapper = initPartitionRowWrapper(partitionType);
+      this.deletePartitionProjections = buildPartitionProjections(partitionType, specs);
+
+      this.specIdOrdinal = context.metadataSparkType().fieldIndex(MetadataColumns.SPEC_ID.name());
+      this.partitionOrdinal = context.metadataSparkType().fieldIndex(MetadataColumns.PARTITION_COLUMN_NAME);
+      this.fileOrdinal = context.deleteSparkType().fieldIndex(MetadataColumns.FILE_PATH.name());
+      this.positionOrdinal = context.deleteSparkType().fieldIndex(MetadataColumns.ROW_POSITION.name());
+    }
+
+    @Override
+    public void delete(InternalRow meta, InternalRow id) throws IOException {
+      int specId = meta.getInt(specIdOrdinal);
+      PartitionSpec spec = specs.get(specId);
+
+      InternalRow partition = meta.getStruct(partitionOrdinal, deletePartitionRowWrapper.size());
+      StructProjection partitionProjection = deletePartitionProjections.get(specId);
+      partitionProjection.wrap(deletePartitionRowWrapper.wrap(partition));
+
+      String file = id.getString(fileOrdinal);
+      long position = id.getLong(positionOrdinal);
+      delegate.delete(file, position, spec, partitionProjection);
+    }
+
+    @Override
+    public void update(InternalRow meta, InternalRow id, InternalRow row) throws IOException {
+      delete(meta, id);
+      delegate.update(row, dataSpec, null);
+    }
+
+    @Override
+    public void insert(InternalRow row) throws IOException {
+      delegate.insert(row, dataSpec, null);
+    }
+
+    @Override
+    public WriterCommitMessage commit() throws IOException {
+      close();
+
+      WriteResult result = delegate.result();
+      return new DeltaTaskCommit(result);
+    }
+
+    @Override
+    public void abort() throws IOException {
+      close();
+
+      WriteResult result = delegate.result();
+      cleanFiles(io, Arrays.asList(result.dataFiles()));
+      cleanFiles(io, Arrays.asList(result.deleteFiles()));
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (!closed) {
+        delegate.close();
+        this.closed = true;
+      }
+    }
+  }
+
+  private static class PartitionedDeltaWriter extends BaseDeltaWriter {

Review comment:
       It definitely looks to me like there is a lot of duplication between the partitioned and unpartitioned cases. How bad is it to merge the two as you commented above? I think I would probably prefer that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798766661



##########
File path: core/src/main/java/org/apache/iceberg/io/PositionDeltaWriter.java
##########
@@ -41,6 +41,21 @@
    */
   void insert(T row, PartitionSpec spec, StructLike partition);
 
+  /**
+   * Inserts a new version of an existing row to the provided spec/partition.
+   * <p>
+   * This method allows writers to distinguish new and updated records. The caller must separately
+   * invoke {@link #delete(CharSequence, long, PartitionSpec, StructLike)} for the original
+   * row position that is being updated.
+   *
+   * @param row a new version of an existing row
+   * @param spec a new partition spec
+   * @param partition a new partition or null if the spec is unpartitioned
+   */
+  default void update(T row, PartitionSpec spec, StructLike partition) {

Review comment:
       I see. The partitioned writer has a comment about this. Updates come in so that deletes are clustered and ordered correctly -- by the path and position that will be deleted. But updates may have caused the row to change partitions and so that clustering for deletes may not match clustering for updates. So we do need to use a separate fanout writer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798326375



##########
File path: core/src/main/java/org/apache/iceberg/io/PositionDeltaWriter.java
##########
@@ -41,6 +41,21 @@
    */
   void insert(T row, PartitionSpec spec, StructLike partition);
 
+  /**
+   * Inserts a new version of an existing row to the provided spec/partition.
+   * <p>
+   * This method allows writers to distinguish new and updated records. The caller must separately
+   * invoke {@link #delete(CharSequence, long, PartitionSpec, StructLike)} for the original
+   * row position that is being updated.
+   *
+   * @param row a new version of an existing row
+   * @param spec a new partition spec
+   * @param partition a new partition or null if the spec is unpartitioned
+   */
+  default void update(T row, PartitionSpec spec, StructLike partition) {

Review comment:
       Would it be more clear to have update take arguments (CharSequence path, long pos, T row, PartitionSpec spec, StructLike partition) and internally invoke delete()?  Maybe more fitting of the name, than having the caller call delete() first and then this method. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798761578



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
##########
@@ -407,6 +428,216 @@ public void close() throws IOException {
     }
   }
 
+  private static class UnpartitionedDeltaWriter extends BaseDeltaWriter {
+    private final PositionDeltaWriter<InternalRow> delegate;
+    private final FileIO io;
+    private final PartitionSpec dataSpec;
+    private final Map<Integer, PartitionSpec> specs;
+    private final InternalRowWrapper deletePartitionRowWrapper;
+    private final Map<Integer, StructProjection> deletePartitionProjections;
+    private final int specIdOrdinal;
+    private final int partitionOrdinal;
+    private final int fileOrdinal;
+    private final int positionOrdinal;
+
+    private boolean closed = false;
+
+    UnpartitionedDeltaWriter(Table table, SparkFileWriterFactory writerFactory,
+                             OutputFileFactory dataFileFactory, OutputFileFactory deleteFileFactory,
+                             Context context) {
+
+      Preconditions.checkArgument(table.spec().isUnpartitioned(), "Data spec must be unpartitioned");
+
+      ClusteredDataWriter<InternalRow> insertWriter = new ClusteredDataWriter<>(
+          writerFactory, dataFileFactory, table.io(),
+          context.dataFileFormat(), context.targetDataFileSize());
+
+      ClusteredDataWriter<InternalRow> updateWriter = new ClusteredDataWriter<>(
+          writerFactory, dataFileFactory, table.io(),
+          context.dataFileFormat(), context.targetDataFileSize());
+
+      ClusteredPositionDeleteWriter<InternalRow> deleteWriter = new ClusteredPositionDeleteWriter<>(
+          writerFactory, deleteFileFactory, table.io(),
+          context.deleteFileFormat(), context.targetDeleteFileSize());
+
+      this.delegate = new BasePositionDeltaWriter<>(insertWriter, updateWriter, deleteWriter);
+      this.io = table.io();
+      this.dataSpec = table.spec();
+      this.specs = table.specs();
+
+      Types.StructType partitionType = Partitioning.partitionType(table);
+      this.deletePartitionRowWrapper = initPartitionRowWrapper(partitionType);
+      this.deletePartitionProjections = buildPartitionProjections(partitionType, specs);
+
+      this.specIdOrdinal = context.metadataSparkType().fieldIndex(MetadataColumns.SPEC_ID.name());
+      this.partitionOrdinal = context.metadataSparkType().fieldIndex(MetadataColumns.PARTITION_COLUMN_NAME);
+      this.fileOrdinal = context.deleteSparkType().fieldIndex(MetadataColumns.FILE_PATH.name());
+      this.positionOrdinal = context.deleteSparkType().fieldIndex(MetadataColumns.ROW_POSITION.name());
+    }
+
+    @Override
+    public void delete(InternalRow meta, InternalRow id) throws IOException {
+      int specId = meta.getInt(specIdOrdinal);
+      PartitionSpec spec = specs.get(specId);
+
+      InternalRow partition = meta.getStruct(partitionOrdinal, deletePartitionRowWrapper.size());
+      StructProjection partitionProjection = deletePartitionProjections.get(specId);
+      partitionProjection.wrap(deletePartitionRowWrapper.wrap(partition));
+
+      String file = id.getString(fileOrdinal);
+      long position = id.getLong(positionOrdinal);
+      delegate.delete(file, position, spec, partitionProjection);
+    }
+
+    @Override
+    public void update(InternalRow meta, InternalRow id, InternalRow row) throws IOException {
+      delete(meta, id);
+      delegate.update(row, dataSpec, null);
+    }
+
+    @Override
+    public void insert(InternalRow row) throws IOException {
+      delegate.insert(row, dataSpec, null);
+    }
+
+    @Override
+    public WriterCommitMessage commit() throws IOException {
+      close();
+
+      WriteResult result = delegate.result();
+      return new DeltaTaskCommit(result);
+    }
+
+    @Override
+    public void abort() throws IOException {
+      close();
+
+      WriteResult result = delegate.result();
+      cleanFiles(io, Arrays.asList(result.dataFiles()));
+      cleanFiles(io, Arrays.asList(result.deleteFiles()));
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (!closed) {
+        delegate.close();
+        this.closed = true;
+      }
+    }
+  }
+
+  private static class PartitionedDeltaWriter extends BaseDeltaWriter {
+    private final PositionDeltaWriter<InternalRow> delegate;
+    private final FileIO io;
+    private final PartitionSpec dataSpec;
+    private final Map<Integer, PartitionSpec> specs;
+    private final InternalRowWrapper deletePartitionRowWrapper;
+    private final Map<Integer, StructProjection> deletePartitionProjections;
+    private final PartitionKey dataPartitionKey;
+    private final InternalRowWrapper internalRowDataWrapper;
+    private final int specIdOrdinal;
+    private final int partitionOrdinal;
+    private final int fileOrdinal;
+    private final int positionOrdinal;
+
+    private boolean closed = false;
+
+    PartitionedDeltaWriter(Table table, SparkFileWriterFactory writerFactory,
+                           OutputFileFactory dataFileFactory, OutputFileFactory deleteFileFactory,
+                           Context context) {
+
+      Preconditions.checkArgument(table.spec().isPartitioned(), "Data spec must be partitioned");
+
+      PartitioningWriter<InternalRow, DataWriteResult> insertWriter;
+      if (context.fanoutWriterEnabled()) {
+        insertWriter = new FanoutDataWriter<>(
+            writerFactory, dataFileFactory, table.io(),
+            context.dataFileFormat(), context.targetDataFileSize());
+      } else {
+        insertWriter = new ClusteredDataWriter<>(
+            writerFactory, dataFileFactory, table.io(),
+            context.dataFileFormat(), context.targetDataFileSize());
+      }
+
+      // always use a separate fanout data writer for updates as they may be out of order

Review comment:
       If I remember correctly, this is because we sort by partition, file, and pos, but the correct output order depends on the new data. So we always delete with clustering, but the updated rows may not still conform to the original clustering. Right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r799700542



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
##########
@@ -407,6 +428,216 @@ public void close() throws IOException {
     }
   }
 
+  private static class UnpartitionedDeltaWriter extends BaseDeltaWriter {
+    private final PositionDeltaWriter<InternalRow> delegate;
+    private final FileIO io;
+    private final PartitionSpec dataSpec;
+    private final Map<Integer, PartitionSpec> specs;

Review comment:
       Correct, we need `specs` for delete projections and we need `dataSpec` to attach correct spec ID to the produced data file (since we did not reserve a particular spec ID for unpartitioned specs).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r792167717



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
##########
@@ -407,6 +428,216 @@ public void close() throws IOException {
     }
   }
 
+  private static class UnpartitionedDeltaWriter extends BaseDeltaWriter {

Review comment:
       Both `UnpartitionedDeltaWriter` and `PartitionedDeltaWriter` share the `delete` implementation but `insert` and `update` methods are different. We could refactor these two into a single classs but that would raise questions about extra method calls. We could go with an accessor and hope it will be inlined by JIT but we can't know for sure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r799864319



##########
File path: data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java
##########
@@ -79,13 +79,16 @@ public void setupTable() throws Exception {
   public void testPositionDeltaInsertOnly() throws IOException {
     FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
 
-    ClusteredDataWriter<T> dataWriter = new ClusteredDataWriter<>(
+    ClusteredDataWriter<T> insertWriter = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);

Review comment:
       Wrapped for the consistency with the rest of the code around.

##########
File path: data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java
##########
@@ -144,13 +147,16 @@ public void testPositionDeltaDeleteOnly() throws IOException {
     PartitionSpec unpartitionedSpec = table.specs().get(0);
     PartitionSpec partitionedSpec = table.specs().get(1);
 
-    ClusteredDataWriter<T> dataWriter = new ClusteredDataWriter<>(
+    ClusteredDataWriter<T> insertWriter = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);

Review comment:
       Same here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r799879786



##########
File path: core/src/main/java/org/apache/iceberg/io/PositionDeltaWriter.java
##########
@@ -41,6 +41,21 @@
    */
   void insert(T row, PartitionSpec spec, StructLike partition);
 
+  /**
+   * Inserts a new version of an existing row to the provided spec/partition.
+   * <p>
+   * This method allows writers to distinguish new and updated records. The caller must separately
+   * invoke {@link #delete(CharSequence, long, PartitionSpec, StructLike)} for the original
+   * row position that is being updated.
+   *
+   * @param row a new version of an existing row
+   * @param spec a new partition spec
+   * @param partition a new partition or null if the spec is unpartitioned
+   */
+  default void update(T row, PartitionSpec spec, StructLike partition) {

Review comment:
       Created #4042.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi merged pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r799865337



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
##########
@@ -407,6 +428,216 @@ public void close() throws IOException {
     }
   }
 
+  private static class UnpartitionedDeltaWriter extends BaseDeltaWriter {
+    private final PositionDeltaWriter<InternalRow> delegate;
+    private final FileIO io;
+    private final PartitionSpec dataSpec;
+    private final Map<Integer, PartitionSpec> specs;
+    private final InternalRowWrapper deletePartitionRowWrapper;
+    private final Map<Integer, StructProjection> deletePartitionProjections;
+    private final int specIdOrdinal;
+    private final int partitionOrdinal;
+    private final int fileOrdinal;
+    private final int positionOrdinal;
+
+    private boolean closed = false;
+
+    UnpartitionedDeltaWriter(Table table, SparkFileWriterFactory writerFactory,
+                             OutputFileFactory dataFileFactory, OutputFileFactory deleteFileFactory,
+                             Context context) {
+
+      Preconditions.checkArgument(table.spec().isUnpartitioned(), "Data spec must be unpartitioned");
+
+      ClusteredDataWriter<InternalRow> insertWriter = new ClusteredDataWriter<>(
+          writerFactory, dataFileFactory, table.io(),
+          context.dataFileFormat(), context.targetDataFileSize());
+
+      ClusteredDataWriter<InternalRow> updateWriter = new ClusteredDataWriter<>(
+          writerFactory, dataFileFactory, table.io(),
+          context.dataFileFormat(), context.targetDataFileSize());
+
+      ClusteredPositionDeleteWriter<InternalRow> deleteWriter = new ClusteredPositionDeleteWriter<>(
+          writerFactory, deleteFileFactory, table.io(),
+          context.deleteFileFormat(), context.targetDeleteFileSize());
+
+      this.delegate = new BasePositionDeltaWriter<>(insertWriter, updateWriter, deleteWriter);
+      this.io = table.io();
+      this.dataSpec = table.spec();
+      this.specs = table.specs();
+
+      Types.StructType partitionType = Partitioning.partitionType(table);
+      this.deletePartitionRowWrapper = initPartitionRowWrapper(partitionType);
+      this.deletePartitionProjections = buildPartitionProjections(partitionType, specs);
+
+      this.specIdOrdinal = context.metadataSparkType().fieldIndex(MetadataColumns.SPEC_ID.name());
+      this.partitionOrdinal = context.metadataSparkType().fieldIndex(MetadataColumns.PARTITION_COLUMN_NAME);
+      this.fileOrdinal = context.deleteSparkType().fieldIndex(MetadataColumns.FILE_PATH.name());
+      this.positionOrdinal = context.deleteSparkType().fieldIndex(MetadataColumns.ROW_POSITION.name());
+    }
+
+    @Override
+    public void delete(InternalRow meta, InternalRow id) throws IOException {
+      int specId = meta.getInt(specIdOrdinal);
+      PartitionSpec spec = specs.get(specId);
+
+      InternalRow partition = meta.getStruct(partitionOrdinal, deletePartitionRowWrapper.size());
+      StructProjection partitionProjection = deletePartitionProjections.get(specId);
+      partitionProjection.wrap(deletePartitionRowWrapper.wrap(partition));
+
+      String file = id.getString(fileOrdinal);
+      long position = id.getLong(positionOrdinal);
+      delegate.delete(file, position, spec, partitionProjection);
+    }
+
+    @Override
+    public void update(InternalRow meta, InternalRow id, InternalRow row) throws IOException {
+      delete(meta, id);
+      delegate.update(row, dataSpec, null);
+    }
+
+    @Override
+    public void insert(InternalRow row) throws IOException {
+      delegate.insert(row, dataSpec, null);
+    }
+
+    @Override
+    public WriterCommitMessage commit() throws IOException {
+      close();
+
+      WriteResult result = delegate.result();
+      return new DeltaTaskCommit(result);
+    }
+
+    @Override
+    public void abort() throws IOException {
+      close();
+
+      WriteResult result = delegate.result();
+      cleanFiles(io, Arrays.asList(result.dataFiles()));
+      cleanFiles(io, Arrays.asList(result.deleteFiles()));
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (!closed) {
+        delegate.close();
+        this.closed = true;
+      }
+    }
+  }
+
+  private static class PartitionedDeltaWriter extends BaseDeltaWriter {

Review comment:
       I combined these two classes in one and exposed a protected field in the parent instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r799879008



##########
File path: core/src/main/java/org/apache/iceberg/io/PositionDeltaWriter.java
##########
@@ -41,6 +41,21 @@
    */
   void insert(T row, PartitionSpec spec, StructLike partition);
 
+  /**
+   * Inserts a new version of an existing row to the provided spec/partition.
+   * <p>
+   * This method allows writers to distinguish new and updated records. The caller must separately
+   * invoke {@link #delete(CharSequence, long, PartitionSpec, StructLike)} for the original
+   * row position that is being updated.
+   *
+   * @param row a new version of an existing row
+   * @param spec a new partition spec
+   * @param partition a new partition or null if the spec is unpartitioned
+   */
+  default void update(T row, PartitionSpec spec, StructLike partition) {

Review comment:
       Let me create an issue where we can discuss these two options to avoid blocking the subsequent MERGE PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r799872258



##########
File path: core/src/main/java/org/apache/iceberg/io/PositionDeltaWriter.java
##########
@@ -41,6 +41,21 @@
    */
   void insert(T row, PartitionSpec spec, StructLike partition);
 
+  /**
+   * Inserts a new version of an existing row to the provided spec/partition.
+   * <p>
+   * This method allows writers to distinguish new and updated records. The caller must separately
+   * invoke {@link #delete(CharSequence, long, PartitionSpec, StructLike)} for the original
+   * row position that is being updated.
+   *
+   * @param row a new version of an existing row
+   * @param spec a new partition spec
+   * @param partition a new partition or null if the spec is unpartitioned
+   */
+  default void update(T row, PartitionSpec spec, StructLike partition) {

Review comment:
       Yea I see, that is unfortunately a bit of arguments, though still less than max limit of checkstyle.  Yea in my opinion, one single method is much cleaner than having the client remembering to call this one after delete(), but it is definitely a style choice and also not sure if anyone else agrees (@rdblue , @RussellSpitzer @jackye1995 ). Else we might  rename the method to something a bit awkward like 'updateDeleted' to be more clear.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798137111



##########
File path: data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java
##########
@@ -79,13 +79,16 @@ public void setupTable() throws Exception {
   public void testPositionDeltaInsertOnly() throws IOException {
     FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
 
-    ClusteredDataWriter<T> dataWriter = new ClusteredDataWriter<>(
+    ClusteredDataWriter<T> insertWriter = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);

Review comment:
       Does this need to wrap here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798755516



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
##########
@@ -407,6 +428,216 @@ public void close() throws IOException {
     }
   }
 
+  private static class UnpartitionedDeltaWriter extends BaseDeltaWriter {
+    private final PositionDeltaWriter<InternalRow> delegate;
+    private final FileIO io;
+    private final PartitionSpec dataSpec;
+    private final Map<Integer, PartitionSpec> specs;

Review comment:
       Yeah, I think we need `specs` for the incoming delete partition projections, but we know that `dataSpec` is unpartitioned. I guess we just need that to pass into delegates later?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798758598



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
##########
@@ -407,6 +428,216 @@ public void close() throws IOException {
     }
   }
 
+  private static class UnpartitionedDeltaWriter extends BaseDeltaWriter {
+    private final PositionDeltaWriter<InternalRow> delegate;
+    private final FileIO io;
+    private final PartitionSpec dataSpec;
+    private final Map<Integer, PartitionSpec> specs;
+    private final InternalRowWrapper deletePartitionRowWrapper;
+    private final Map<Integer, StructProjection> deletePartitionProjections;
+    private final int specIdOrdinal;
+    private final int partitionOrdinal;
+    private final int fileOrdinal;
+    private final int positionOrdinal;
+
+    private boolean closed = false;
+
+    UnpartitionedDeltaWriter(Table table, SparkFileWriterFactory writerFactory,
+                             OutputFileFactory dataFileFactory, OutputFileFactory deleteFileFactory,
+                             Context context) {
+
+      Preconditions.checkArgument(table.spec().isUnpartitioned(), "Data spec must be unpartitioned");
+
+      ClusteredDataWriter<InternalRow> insertWriter = new ClusteredDataWriter<>(
+          writerFactory, dataFileFactory, table.io(),
+          context.dataFileFormat(), context.targetDataFileSize());
+
+      ClusteredDataWriter<InternalRow> updateWriter = new ClusteredDataWriter<>(
+          writerFactory, dataFileFactory, table.io(),
+          context.dataFileFormat(), context.targetDataFileSize());
+
+      ClusteredPositionDeleteWriter<InternalRow> deleteWriter = new ClusteredPositionDeleteWriter<>(
+          writerFactory, deleteFileFactory, table.io(),
+          context.deleteFileFormat(), context.targetDeleteFileSize());
+
+      this.delegate = new BasePositionDeltaWriter<>(insertWriter, updateWriter, deleteWriter);
+      this.io = table.io();
+      this.dataSpec = table.spec();
+      this.specs = table.specs();
+
+      Types.StructType partitionType = Partitioning.partitionType(table);
+      this.deletePartitionRowWrapper = initPartitionRowWrapper(partitionType);
+      this.deletePartitionProjections = buildPartitionProjections(partitionType, specs);
+
+      this.specIdOrdinal = context.metadataSparkType().fieldIndex(MetadataColumns.SPEC_ID.name());
+      this.partitionOrdinal = context.metadataSparkType().fieldIndex(MetadataColumns.PARTITION_COLUMN_NAME);
+      this.fileOrdinal = context.deleteSparkType().fieldIndex(MetadataColumns.FILE_PATH.name());
+      this.positionOrdinal = context.deleteSparkType().fieldIndex(MetadataColumns.ROW_POSITION.name());
+    }
+
+    @Override
+    public void delete(InternalRow meta, InternalRow id) throws IOException {
+      int specId = meta.getInt(specIdOrdinal);
+      PartitionSpec spec = specs.get(specId);
+
+      InternalRow partition = meta.getStruct(partitionOrdinal, deletePartitionRowWrapper.size());
+      StructProjection partitionProjection = deletePartitionProjections.get(specId);
+      partitionProjection.wrap(deletePartitionRowWrapper.wrap(partition));
+
+      String file = id.getString(fileOrdinal);
+      long position = id.getLong(positionOrdinal);
+      delegate.delete(file, position, spec, partitionProjection);
+    }
+
+    @Override
+    public void update(InternalRow meta, InternalRow id, InternalRow row) throws IOException {
+      delete(meta, id);
+      delegate.update(row, dataSpec, null);
+    }
+
+    @Override
+    public void insert(InternalRow row) throws IOException {
+      delegate.insert(row, dataSpec, null);
+    }
+
+    @Override
+    public WriterCommitMessage commit() throws IOException {
+      close();
+
+      WriteResult result = delegate.result();
+      return new DeltaTaskCommit(result);
+    }
+
+    @Override
+    public void abort() throws IOException {
+      close();
+
+      WriteResult result = delegate.result();
+      cleanFiles(io, Arrays.asList(result.dataFiles()));
+      cleanFiles(io, Arrays.asList(result.deleteFiles()));
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (!closed) {
+        delegate.close();
+        this.closed = true;
+      }
+    }
+  }
+
+  private static class PartitionedDeltaWriter extends BaseDeltaWriter {
+    private final PositionDeltaWriter<InternalRow> delegate;
+    private final FileIO io;
+    private final PartitionSpec dataSpec;
+    private final Map<Integer, PartitionSpec> specs;
+    private final InternalRowWrapper deletePartitionRowWrapper;
+    private final Map<Integer, StructProjection> deletePartitionProjections;
+    private final PartitionKey dataPartitionKey;
+    private final InternalRowWrapper internalRowDataWrapper;
+    private final int specIdOrdinal;
+    private final int partitionOrdinal;
+    private final int fileOrdinal;
+    private final int positionOrdinal;
+
+    private boolean closed = false;
+
+    PartitionedDeltaWriter(Table table, SparkFileWriterFactory writerFactory,
+                           OutputFileFactory dataFileFactory, OutputFileFactory deleteFileFactory,
+                           Context context) {
+
+      Preconditions.checkArgument(table.spec().isPartitioned(), "Data spec must be partitioned");
+
+      PartitioningWriter<InternalRow, DataWriteResult> insertWriter;
+      if (context.fanoutWriterEnabled()) {
+        insertWriter = new FanoutDataWriter<>(
+            writerFactory, dataFileFactory, table.io(),
+            context.dataFileFormat(), context.targetDataFileSize());
+      } else {
+        insertWriter = new ClusteredDataWriter<>(
+            writerFactory, dataFileFactory, table.io(),
+            context.dataFileFormat(), context.targetDataFileSize());
+      }
+
+      // always use a separate fanout data writer for updates as they may be out of order

Review comment:
       Got it. This is what I was looking for.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r799704833



##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1441,6 +1441,132 @@ public void testRangePositionDeltaDeletePartitionedTable() {
         table, DELETE, expectedDistribution, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read UPDATE operations with position deletes
+  // ===================================================================================
+  //
+  // update mode is NOT SET -> CLUSTER BY _spec_id, _partition + LOCALLY ORDER BY _spec_id, _partition, _file, _pos

Review comment:
       Okay, let's keep it for now then.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r792165316



##########
File path: core/src/main/java/org/apache/iceberg/io/PositionDeltaWriter.java
##########
@@ -41,6 +41,21 @@
    */
   void insert(T row, PartitionSpec spec, StructLike partition);
 
+  /**
+   * Inserts a new version of an existing row to the provided spec/partition.
+   * <p>
+   * This method allows writers to distinguish new and updated records. The caller must separately
+   * invoke {@link #delete(CharSequence, long, PartitionSpec, StructLike)} for the original
+   * row position that is being updated.
+   *
+   * @param row a new version of an existing row
+   * @param spec a new partition spec
+   * @param partition a new partition or null if the spec is unpartitioned
+   */
+  default void update(T row, PartitionSpec spec, StructLike partition) {

Review comment:
       This needs to be discussed and I am open to alternatives. With the current change, the API seems a bit confusing but maybe we can play with names to make the purpose clear. In short, this method is to distinguish inserts from updates. In case of MERGE, we order updates by _spec_id, _partition, _file, _pos and updated records can be out of order if we change the sort key or partition columns. For inserts, the metadata columns are null so we have no problems ordering inserts.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r792168256



##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1441,6 +1441,132 @@ public void testRangePositionDeltaDeletePartitionedTable() {
         table, DELETE, expectedDistribution, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read UPDATE operations with position deletes
+  // ===================================================================================
+  //
+  // update mode is NOT SET -> CLUSTER BY _spec_id, _partition + LOCALLY ORDER BY _spec_id, _partition, _file, _pos

Review comment:
       Same like in merge-on-read DELETE. Here we are dealing with full rows so it does not seem like a good idea to group everything into a single task for unpartitioned tables. Ideas are welcome.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798743404



##########
File path: core/src/main/java/org/apache/iceberg/io/PositionDeltaWriter.java
##########
@@ -41,6 +41,21 @@
    */
   void insert(T row, PartitionSpec spec, StructLike partition);
 
+  /**
+   * Inserts a new version of an existing row to the provided spec/partition.
+   * <p>
+   * This method allows writers to distinguish new and updated records. The caller must separately
+   * invoke {@link #delete(CharSequence, long, PartitionSpec, StructLike)} for the original
+   * row position that is being updated.
+   *
+   * @param row a new version of an existing row
+   * @param spec a new partition spec
+   * @param partition a new partition or null if the spec is unpartitioned
+   */
+  default void update(T row, PartitionSpec spec, StructLike partition) {

Review comment:
       Actually, both insert and update writers appear to be clustered. Do they just have a different clustering?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798751849



##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1441,6 +1441,132 @@ public void testRangePositionDeltaDeletePartitionedTable() {
         table, DELETE, expectedDistribution, SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read UPDATE operations with position deletes
+  // ===================================================================================
+  //
+  // update mode is NOT SET -> CLUSTER BY _spec_id, _partition + LOCALLY ORDER BY _spec_id, _partition, _file, _pos

Review comment:
       I think this is a reasonable default. We don't expect high volume for merge-on-read, nor do we expect unpartitioned tables to be high volume. If they are, then setting an explicit mode=none will fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798753090



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
##########
@@ -407,6 +428,216 @@ public void close() throws IOException {
     }
   }
 
+  private static class UnpartitionedDeltaWriter extends BaseDeltaWriter {
+    private final PositionDeltaWriter<InternalRow> delegate;
+    private final FileIO io;
+    private final PartitionSpec dataSpec;
+    private final Map<Integer, PartitionSpec> specs;

Review comment:
       Why have `dataSpec` and `specs` for unpartitioned tables? Is this that this may be reading previously partitioned data?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798742982



##########
File path: data/src/test/java/org/apache/iceberg/io/TestPositionDeltaWriters.java
##########
@@ -144,13 +147,16 @@ public void testPositionDeltaDeleteOnly() throws IOException {
     PartitionSpec unpartitionedSpec = table.specs().get(0);
     PartitionSpec partitionedSpec = table.specs().get(1);
 
-    ClusteredDataWriter<T> dataWriter = new ClusteredDataWriter<>(
+    ClusteredDataWriter<T> insertWriter = new ClusteredDataWriter<>(
+        writerFactory, fileFactory, table.io(),
+        fileFormat, TARGET_FILE_SIZE);

Review comment:
       Nit: Unnecessary line wrap?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798805693



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala
##########
@@ -126,6 +128,33 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
     ReplaceData(writeRelation, updatedAndRemainingRowsPlan, relation)
   }
 
+  // build a rewrite plan for sources that support row deltas
+  private def buildWriteDeltaPlan(
+      relation: DataSourceV2Relation,
+      table: RowLevelOperationTable,
+      assignments: Seq[Assignment],
+      cond: Expression): WriteDelta = {
+
+    // resolve all needed attrs (e.g. row ID and any required metadata attrs)
+    val rowAttrs = relation.output
+    val rowIdAttrs = resolveRowIdAttrs(relation, table.operation)
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, table.operation)
+
+    // construct a scan relation and include all required metadata columns
+    val readRelation = buildReadRelation(relation, table, metadataAttrs, rowIdAttrs)
+
+    // build a plan for updated records that match the cond
+    val matchedRowsPlan = Filter(cond, readRelation)
+    val updatedRowsPlan = buildUpdateProjection(matchedRowsPlan, assignments)
+    val operationType = Alias(Literal(UPDATE_OPERATION), OPERATION_COLUMN)()
+    val project = Project(operationType +: updatedRowsPlan.output, updatedRowsPlan)
+
+    // build a plan to write the row delta to the table
+    val writeRelation = relation.copy(table = table)

Review comment:
       Ah sorry, tricked by variable shadowing.. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r799693107



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala
##########
@@ -126,6 +128,33 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
     ReplaceData(writeRelation, updatedAndRemainingRowsPlan, relation)
   }
 
+  // build a rewrite plan for sources that support row deltas
+  private def buildWriteDeltaPlan(
+      relation: DataSourceV2Relation,
+      table: RowLevelOperationTable,
+      assignments: Seq[Assignment],
+      cond: Expression): WriteDelta = {
+
+    // resolve all needed attrs (e.g. row ID and any required metadata attrs)
+    val rowAttrs = relation.output
+    val rowIdAttrs = resolveRowIdAttrs(relation, table.operation)
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, table.operation)
+
+    // construct a scan relation and include all required metadata columns
+    val readRelation = buildReadRelation(relation, table, metadataAttrs, rowIdAttrs)
+
+    // build a plan for updated records that match the cond
+    val matchedRowsPlan = Filter(cond, readRelation)
+    val updatedRowsPlan = buildUpdateProjection(matchedRowsPlan, assignments)
+    val operationType = Alias(Literal(UPDATE_OPERATION), OPERATION_COLUMN)()
+    val project = Project(operationType +: updatedRowsPlan.output, updatedRowsPlan)
+
+    // build a plan to write the row delta to the table
+    val writeRelation = relation.copy(table = table)

Review comment:
       I'll try to change the name to make it a bit more obvious. I'll do that in a separate PR to cover all places at once.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#issuecomment-1030421595


   @rdblue, I've introduced a parent class to reduce the code duplication in unpartitioned and partitioned delta writers, added tests for out of order records when we update a partition column and added a test for static partition pruning. Dynamic pruning for merge-on-read is a little bit tricky to trigger (we just use the regular scan). I'll give it another try in MERGE.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r799329980



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDistributionAndOrderingUtil.java
##########
@@ -146,14 +146,14 @@ private static Distribution buildCopyOnWriteDeleteUpdateDistribution(Table table
 
   public static Distribution buildPositionDeltaDistribution(Table table, Command command,
                                                             DistributionMode distributionMode) {
-    if (command == DELETE) {
-      return positionDeleteDistribution(distributionMode);
+    if (command == DELETE || command == UPDATE) {
+      return buildPositionDeleteUpdateDistribution(distributionMode);
     } else {
       throw new IllegalArgumentException("Only position deletes are currently supported");

Review comment:
       [minor] can update the exception message here as well, to include `update` command




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r799881409



##########
File path: core/src/main/java/org/apache/iceberg/io/PositionDeltaWriter.java
##########
@@ -41,6 +41,21 @@
    */
   void insert(T row, PartitionSpec spec, StructLike partition);
 
+  /**
+   * Inserts a new version of an existing row to the provided spec/partition.
+   * <p>
+   * This method allows writers to distinguish new and updated records. The caller must separately
+   * invoke {@link #delete(CharSequence, long, PartitionSpec, StructLike)} for the original
+   * row position that is being updated.
+   *
+   * @param row a new version of an existing row
+   * @param spec a new partition spec
+   * @param partition a new partition or null if the spec is unpartitioned
+   */
+  default void update(T row, PartitionSpec spec, StructLike partition) {

Review comment:
       Sure no problem, it is not a blocker




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r792218351



##########
File path: core/src/main/java/org/apache/iceberg/io/PositionDeltaWriter.java
##########
@@ -41,6 +41,21 @@
    */
   void insert(T row, PartitionSpec spec, StructLike partition);
 
+  /**
+   * Inserts a new version of an existing row to the provided spec/partition.
+   * <p>
+   * This method allows writers to distinguish new and updated records. The caller must separately
+   * invoke {@link #delete(CharSequence, long, PartitionSpec, StructLike)} for the original
+   * row position that is being updated.
+   *
+   * @param row a new version of an existing row
+   * @param spec a new partition spec
+   * @param partition a new partition or null if the spec is unpartitioned
+   */
+  default void update(T row, PartitionSpec spec, StructLike partition) {

Review comment:
       Splitting an update into two records (separate delete and separate insert) on the engine side is an option to consider but will require quite some effort to implement. Ideally, I would avoid that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798156043



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala
##########
@@ -126,6 +128,33 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
     ReplaceData(writeRelation, updatedAndRemainingRowsPlan, relation)
   }
 
+  // build a rewrite plan for sources that support row deltas
+  private def buildWriteDeltaPlan(
+      relation: DataSourceV2Relation,
+      table: RowLevelOperationTable,
+      assignments: Seq[Assignment],
+      cond: Expression): WriteDelta = {
+
+    // resolve all needed attrs (e.g. row ID and any required metadata attrs)
+    val rowAttrs = relation.output
+    val rowIdAttrs = resolveRowIdAttrs(relation, table.operation)
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, table.operation)
+
+    // construct a scan relation and include all required metadata columns
+    val readRelation = buildReadRelation(relation, table, metadataAttrs, rowIdAttrs)
+
+    // build a plan for updated records that match the cond
+    val matchedRowsPlan = Filter(cond, readRelation)
+    val updatedRowsPlan = buildUpdateProjection(matchedRowsPlan, assignments)
+    val operationType = Alias(Literal(UPDATE_OPERATION), OPERATION_COLUMN)()
+    val project = Project(operationType +: updatedRowsPlan.output, updatedRowsPlan)
+
+    // build a plan to write the row delta to the table
+    val writeRelation = relation.copy(table = table)

Review comment:
       Nit, the table argument seems unnecessary. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r798756710



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
##########
@@ -407,6 +428,216 @@ public void close() throws IOException {
     }
   }
 
+  private static class UnpartitionedDeltaWriter extends BaseDeltaWriter {
+    private final PositionDeltaWriter<InternalRow> delegate;
+    private final FileIO io;
+    private final PartitionSpec dataSpec;
+    private final Map<Integer, PartitionSpec> specs;
+    private final InternalRowWrapper deletePartitionRowWrapper;
+    private final Map<Integer, StructProjection> deletePartitionProjections;
+    private final int specIdOrdinal;
+    private final int partitionOrdinal;
+    private final int fileOrdinal;
+    private final int positionOrdinal;
+
+    private boolean closed = false;
+
+    UnpartitionedDeltaWriter(Table table, SparkFileWriterFactory writerFactory,
+                             OutputFileFactory dataFileFactory, OutputFileFactory deleteFileFactory,
+                             Context context) {
+
+      Preconditions.checkArgument(table.spec().isUnpartitioned(), "Data spec must be unpartitioned");
+
+      ClusteredDataWriter<InternalRow> insertWriter = new ClusteredDataWriter<>(
+          writerFactory, dataFileFactory, table.io(),
+          context.dataFileFormat(), context.targetDataFileSize());
+
+      ClusteredDataWriter<InternalRow> updateWriter = new ClusteredDataWriter<>(
+          writerFactory, dataFileFactory, table.io(),
+          context.dataFileFormat(), context.targetDataFileSize());
+
+      ClusteredPositionDeleteWriter<InternalRow> deleteWriter = new ClusteredPositionDeleteWriter<>(
+          writerFactory, deleteFileFactory, table.io(),
+          context.deleteFileFormat(), context.targetDeleteFileSize());
+
+      this.delegate = new BasePositionDeltaWriter<>(insertWriter, updateWriter, deleteWriter);
+      this.io = table.io();
+      this.dataSpec = table.spec();
+      this.specs = table.specs();
+
+      Types.StructType partitionType = Partitioning.partitionType(table);
+      this.deletePartitionRowWrapper = initPartitionRowWrapper(partitionType);
+      this.deletePartitionProjections = buildPartitionProjections(partitionType, specs);
+
+      this.specIdOrdinal = context.metadataSparkType().fieldIndex(MetadataColumns.SPEC_ID.name());
+      this.partitionOrdinal = context.metadataSparkType().fieldIndex(MetadataColumns.PARTITION_COLUMN_NAME);
+      this.fileOrdinal = context.deleteSparkType().fieldIndex(MetadataColumns.FILE_PATH.name());
+      this.positionOrdinal = context.deleteSparkType().fieldIndex(MetadataColumns.ROW_POSITION.name());
+    }
+
+    @Override
+    public void delete(InternalRow meta, InternalRow id) throws IOException {
+      int specId = meta.getInt(specIdOrdinal);
+      PartitionSpec spec = specs.get(specId);
+
+      InternalRow partition = meta.getStruct(partitionOrdinal, deletePartitionRowWrapper.size());
+      StructProjection partitionProjection = deletePartitionProjections.get(specId);
+      partitionProjection.wrap(deletePartitionRowWrapper.wrap(partition));
+
+      String file = id.getString(fileOrdinal);
+      long position = id.getLong(positionOrdinal);
+      delegate.delete(file, position, spec, partitionProjection);
+    }
+
+    @Override
+    public void update(InternalRow meta, InternalRow id, InternalRow row) throws IOException {
+      delete(meta, id);
+      delegate.update(row, dataSpec, null);

Review comment:
       `dataSpec` is required here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r799329980



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDistributionAndOrderingUtil.java
##########
@@ -146,14 +146,14 @@ private static Distribution buildCopyOnWriteDeleteUpdateDistribution(Table table
 
   public static Distribution buildPositionDeltaDistribution(Table table, Command command,
                                                             DistributionMode distributionMode) {
-    if (command == DELETE) {
-      return positionDeleteDistribution(distributionMode);
+    if (command == DELETE || command == UPDATE) {
+      return buildPositionDeleteUpdateDistribution(distributionMode);
     } else {
       throw new IllegalArgumentException("Only position deletes are currently supported");

Review comment:
       [minor] can update the exception message here as well, to include `update` command: 
   
   ```suggestion
         throw new IllegalArgumentException("Only position deletes and updates are currently supported");
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#issuecomment-1030429954


   Thanks for reviewing, @szehon-ho @rdblue @singhpk234! Issue #4042 remains open.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3984: Spark 3.2: Implement merge-on-read UPDATE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3984:
URL: https://github.com/apache/iceberg/pull/3984#discussion_r799701058



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
##########
@@ -407,6 +428,216 @@ public void close() throws IOException {
     }
   }
 
+  private static class UnpartitionedDeltaWriter extends BaseDeltaWriter {
+    private final PositionDeltaWriter<InternalRow> delegate;
+    private final FileIO io;
+    private final PartitionSpec dataSpec;
+    private final Map<Integer, PartitionSpec> specs;
+    private final InternalRowWrapper deletePartitionRowWrapper;
+    private final Map<Integer, StructProjection> deletePartitionProjections;
+    private final int specIdOrdinal;
+    private final int partitionOrdinal;
+    private final int fileOrdinal;
+    private final int positionOrdinal;
+
+    private boolean closed = false;
+
+    UnpartitionedDeltaWriter(Table table, SparkFileWriterFactory writerFactory,
+                             OutputFileFactory dataFileFactory, OutputFileFactory deleteFileFactory,
+                             Context context) {
+
+      Preconditions.checkArgument(table.spec().isUnpartitioned(), "Data spec must be unpartitioned");
+
+      ClusteredDataWriter<InternalRow> insertWriter = new ClusteredDataWriter<>(
+          writerFactory, dataFileFactory, table.io(),
+          context.dataFileFormat(), context.targetDataFileSize());
+
+      ClusteredDataWriter<InternalRow> updateWriter = new ClusteredDataWriter<>(
+          writerFactory, dataFileFactory, table.io(),
+          context.dataFileFormat(), context.targetDataFileSize());
+
+      ClusteredPositionDeleteWriter<InternalRow> deleteWriter = new ClusteredPositionDeleteWriter<>(
+          writerFactory, deleteFileFactory, table.io(),
+          context.deleteFileFormat(), context.targetDeleteFileSize());
+
+      this.delegate = new BasePositionDeltaWriter<>(insertWriter, updateWriter, deleteWriter);
+      this.io = table.io();
+      this.dataSpec = table.spec();
+      this.specs = table.specs();
+
+      Types.StructType partitionType = Partitioning.partitionType(table);
+      this.deletePartitionRowWrapper = initPartitionRowWrapper(partitionType);
+      this.deletePartitionProjections = buildPartitionProjections(partitionType, specs);
+
+      this.specIdOrdinal = context.metadataSparkType().fieldIndex(MetadataColumns.SPEC_ID.name());
+      this.partitionOrdinal = context.metadataSparkType().fieldIndex(MetadataColumns.PARTITION_COLUMN_NAME);
+      this.fileOrdinal = context.deleteSparkType().fieldIndex(MetadataColumns.FILE_PATH.name());
+      this.positionOrdinal = context.deleteSparkType().fieldIndex(MetadataColumns.ROW_POSITION.name());
+    }
+
+    @Override
+    public void delete(InternalRow meta, InternalRow id) throws IOException {
+      int specId = meta.getInt(specIdOrdinal);
+      PartitionSpec spec = specs.get(specId);
+
+      InternalRow partition = meta.getStruct(partitionOrdinal, deletePartitionRowWrapper.size());
+      StructProjection partitionProjection = deletePartitionProjections.get(specId);
+      partitionProjection.wrap(deletePartitionRowWrapper.wrap(partition));
+
+      String file = id.getString(fileOrdinal);
+      long position = id.getLong(positionOrdinal);
+      delegate.delete(file, position, spec, partitionProjection);
+    }
+
+    @Override
+    public void update(InternalRow meta, InternalRow id, InternalRow row) throws IOException {
+      delete(meta, id);
+      delegate.update(row, dataSpec, null);

Review comment:
       Yes, the produced `DataFile` should have the correct spec ID.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org