You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "flyrain (via GitHub)" <gi...@apache.org> on 2023/05/05 18:41:41 UTC

[GitHub] [iceberg] flyrain commented on a diff in pull request #7388: Spark 3.3: Uniquess validation when computing updates of changelogs

flyrain commented on code in PR #7388:
URL: https://github.com/apache/iceberg/pull/7388#discussion_r1186400067


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/ChangelogIterator.java:
##########
@@ -18,193 +18,79 @@
  */
 package org.apache.iceberg.spark;
 
-import java.util.Arrays;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Objects;
-import java.util.stream.Collectors;
 import org.apache.iceberg.ChangelogOperation;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.catalyst.expressions.GenericRow;
 import org.apache.spark.sql.types.StructType;
 
-/**
- * An iterator that transforms rows from changelog tables within a single Spark task. It assumes
- * that rows are sorted by identifier columns and change type.
- *
- * <p>It removes the carry-over rows. Carry-over rows are the result of a removal and insertion of
- * the same row within an operation because of the copy-on-write mechanism. For example, given a
- * file which contains row1 (id=1, data='a') and row2 (id=2, data='b'). A copy-on-write delete of
- * row2 would require erasing this file and preserving row1 in a new file. The change-log table
- * would report this as (id=1, data='a', op='DELETE') and (id=1, data='a', op='INSERT'), despite it
- * not being an actual change to the table. The iterator finds the carry-over rows and removes them
- * from the result.
- *
- * <p>This iterator also finds delete/insert rows which represent an update, and converts them into
- * update records. For example, these two rows
- *
- * <ul>
- *   <li>(id=1, data='a', op='DELETE')
- *   <li>(id=1, data='b', op='INSERT')
- * </ul>
- *
- * <p>will be marked as update-rows:
- *
- * <ul>
- *   <li>(id=1, data='a', op='UPDATE_BEFORE')
- *   <li>(id=1, data='b', op='UPDATE_AFTER')
- * </ul>
- */
+/** An iterator that transforms rows from changelog tables within a single Spark task. */
 public class ChangelogIterator implements Iterator<Row> {
-  private static final String DELETE = ChangelogOperation.DELETE.name();
-  private static final String INSERT = ChangelogOperation.INSERT.name();
-  private static final String UPDATE_BEFORE = ChangelogOperation.UPDATE_BEFORE.name();
-  private static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name();
+  protected static final String DELETE = ChangelogOperation.DELETE.name();
+  protected static final String INSERT = ChangelogOperation.INSERT.name();
+  protected static final String UPDATE_BEFORE = ChangelogOperation.UPDATE_BEFORE.name();
+  protected static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name();
 
   private final Iterator<Row> rowIterator;
   private final int changeTypeIndex;
-  private final List<Integer> identifierFieldIdx;
-  private final int[] indicesForIdentifySameRow;
 
-  private Row cachedRow = null;
-
-  private ChangelogIterator(
-      Iterator<Row> rowIterator, StructType rowType, String[] identifierFields) {
+  protected ChangelogIterator(Iterator<Row> rowIterator, StructType rowType) {
     this.rowIterator = rowIterator;
     this.changeTypeIndex = rowType.fieldIndex(MetadataColumns.CHANGE_TYPE.name());
-    this.identifierFieldIdx =
-        Arrays.stream(identifierFields)
-            .map(column -> rowType.fieldIndex(column.toString()))
-            .collect(Collectors.toList());
-    this.indicesForIdentifySameRow = generateIndicesForIdentifySameRow(rowType.size());
+  }
+
+  protected int changeTypeIndex() {
+    return changeTypeIndex;
+  }
+
+  protected Iterator<Row> rowIterator() {
+    return rowIterator;
   }
 
   /**
-   * Creates an iterator for records of a changelog table.
+   * Creates an iterator combine with {@link CarryoverRemoveIterator} and {@link
+   * ComputeUpdateIterator} to remove carry-over rows and compute update rows
    *
    * @param rowIterator the iterator of rows from a changelog table
    * @param rowType the schema of the rows
    * @param identifierFields the names of the identifier columns, which determine if rows are the
    *     same
-   * @return a new {@link ChangelogIterator} instance concatenated with the null-removal iterator
+   * @return a new iterator instance
    */
   public static Iterator<Row> create(

Review Comment:
   Good idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org