You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "szehon-ho (via GitHub)" <gi...@apache.org> on 2023/04/04 23:19:51 UTC

[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7029: Spark 3.3: Dataset writes for position deletes

szehon-ho commented on code in PR #7029:
URL: https://github.com/apache/iceberg/pull/7029#discussion_r1157856320


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.ScanTaskSetManager;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.SparkWriteConf;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.Write;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Builder class for rewrites of position delete files from Spark. Responsible for creating {@link
+ * SparkPositionDeletesRewrite}.
+ *
+ * <p>This class is meant to be used for an action to rewrite delete files. Hence, it makes an
+ * assumption that all incoming deletes belong to the same partition, and that incoming dataset is
+ * from {@link ScanTaskSetManager}.
+ */
+public class SparkPositionDeletesRewriteBuilder implements WriteBuilder {
+
+  private final SparkSession spark;
+  private final Table table;
+  private final SparkWriteConf writeConf;
+  private final LogicalWriteInfo writeInfo;
+  private final StructType dsSchema;
+  private final Schema writeSchema;
+
+  SparkPositionDeletesRewriteBuilder(
+      SparkSession spark, Table table, String branch, LogicalWriteInfo info) {
+    this.spark = spark;
+    this.table = table;
+    this.writeConf = new SparkWriteConf(spark, table, branch, info.options());
+    this.writeInfo = info;
+    this.dsSchema = info.schema();
+    this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema, writeConf.caseSensitive());
+  }
+
+  @Override
+  public Write build() {
+    String fileSetId = writeConf.rewrittenFileSetId();
+
+    Preconditions.checkArgument(
+        fileSetId != null, "position_deletes table can only be written by RewriteDeleteFiles");
+    Preconditions.checkArgument(
+        writeConf.handleTimestampWithoutZone()
+            || !SparkUtil.hasTimestampWithoutZone(table.schema()),
+        SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
+
+    // all files of rewrite group have same partition and spec id
+    ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
+    List<PositionDeletesScanTask> tasks = taskSetManager.fetchTasks(table, fileSetId);
+    Preconditions.checkNotNull(tasks, "No scan tasks found for %s", fileSetId);
+    Preconditions.checkArgument(tasks.size() > 0, "No scan tasks found for %s", fileSetId);
+
+    int specId = specId(fileSetId, tasks);
+    StructLike partition = partition(fileSetId, tasks);
+
+    return new SparkPositionDeletesRewrite(
+        spark, table, writeConf, writeInfo, writeSchema, dsSchema, specId, partition);
+  }
+
+  private int specId(String fileSetId, List<PositionDeletesScanTask> tasks) {
+    Set<Integer> specIds = tasks.stream().map(t -> t.spec().specId()).collect(Collectors.toSet());
+    Preconditions.checkArgument(
+        specIds.size() == 1,
+        "All scan tasks of %s are expected to have same spec id, but got %s",
+        fileSetId,
+        Joiner.on(",").join(specIds));
+    return tasks.get(0).spec().specId();
+  }
+
+  private StructLike partition(String fileSetId, List<PositionDeletesScanTask> tasks) {
+    StructLikeSet partitions = StructLikeSet.create(tasks.get(0).spec().partitionType());
+    partitions.addAll(tasks.stream().map(ContentScanTask::partition).collect(Collectors.toList()));

Review Comment:
   Done



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java:
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator;
+import org.apache.iceberg.spark.ScanTaskSetManager;
+import org.apache.iceberg.spark.SparkWriteConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.DataWriter;
+import org.apache.spark.sql.connector.write.DataWriterFactory;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
+import org.apache.spark.sql.connector.write.Write;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * {@link Write} class for rewriting position delete files from Spark. Responsible for creating
+ * {@link PositionDeleteBatchWrite}.
+ *
+ * <p>This class is meant to be used for an action to rewrite position delete files. Hence, it
+ * assumes all position deletes to rewrite have come from {@link ScanTaskSetManager} and that all
+ * have the same partition spec id and partition values.
+ */
+public class SparkPositionDeletesRewrite implements Write {
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final String queryId;
+  private final FileFormat format;
+  private final long targetFileSize;
+  private final Schema writeSchema;
+  private final StructType dsSchema;
+  private final String fileSetId;
+  private final int specId;
+  private final StructLike partition;
+
+  /**
+   * Constructs a {@link SparkPositionDeletesRewrite}.
+   *
+   * @param spark Spark session
+   * @param table instance of {@link PositionDeletesTable}
+   * @param writeConf Spark write config
+   * @param writeInfo Spark write info
+   * @param writeSchema Iceberg output schema
+   * @param dsSchema schema of original incoming position deletes dataset
+   * @param specId spec id of position deletes
+   * @param partition partition value of position deletes
+   */
+  SparkPositionDeletesRewrite(
+      SparkSession spark,
+      Table table,
+      SparkWriteConf writeConf,
+      LogicalWriteInfo writeInfo,
+      Schema writeSchema,
+      StructType dsSchema,
+      int specId,
+      StructLike partition) {
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+    this.table = table;
+    this.queryId = writeInfo.queryId();
+    this.format = writeConf.deleteFileFormat();
+    this.targetFileSize = writeConf.targetDeleteFileSize();
+    this.writeSchema = writeSchema;
+    this.dsSchema = dsSchema;
+    this.fileSetId = writeConf.rewrittenFileSetId();
+    this.specId = specId;
+    this.partition = partition;
+  }
+
+  @Override
+  public BatchWrite toBatch() {
+    return new PositionDeleteBatchWrite();
+  }
+
+  /** {@link BatchWrite} class for rewriting position deletes files from Spark */
+  class PositionDeleteBatchWrite implements BatchWrite {
+
+    @Override
+    public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
+      // broadcast the table metadata as the writer factory will be sent to executors
+      Broadcast<Table> tableBroadcast =
+          sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+      return new PositionDeletesWriterFactory(
+          tableBroadcast,
+          queryId,
+          format,
+          targetFileSize,
+          writeSchema,
+          dsSchema,
+          specId,
+          partition);
+    }
+
+    @Override
+    public void commit(WriterCommitMessage[] messages) {
+      PositionDeletesRewriteCoordinator coordinator = PositionDeletesRewriteCoordinator.get();
+      coordinator.stageRewrite(table, fileSetId, ImmutableSet.copyOf(files(messages)));
+    }
+
+    @Override
+    public void abort(WriterCommitMessage[] messages) {
+      SparkCleanupUtil.deleteFiles("job abort", table.io(), files(messages));
+    }
+
+    private List<DeleteFile> files(WriterCommitMessage[] messages) {
+      List<DeleteFile> files = Lists.newArrayList();
+
+      for (WriterCommitMessage message : messages) {
+        if (message != null) {
+          DeleteTaskCommit taskCommit = (DeleteTaskCommit) message;
+          files.addAll(Arrays.asList(taskCommit.files()));
+        }
+      }
+
+      return files;
+    }
+  }
+
+  /**
+   * Writer factory for position deletes metadata table. Responsible for creating {@link
+   * DeleteWriter}.
+   *
+   * <p>This writer is meant to be used for an action to rewrite delete files. Hence, it makes an
+   * assumption that all incoming deletes belong to the same partition, and that incoming dataset is
+   * from {@link ScanTaskSetManager}.
+   */
+  static class PositionDeletesWriterFactory implements DataWriterFactory {
+    private final Broadcast<Table> tableBroadcast;
+    private final String queryId;
+    private final FileFormat format;
+    private final Long targetFileSize;
+    private final Schema writeSchema;
+    private final StructType dsSchema;
+    private final int specId;
+    private final StructLike partition;
+
+    PositionDeletesWriterFactory(
+        Broadcast<Table> tableBroadcast,
+        String queryId,
+        FileFormat format,
+        long targetFileSize,
+        Schema writeSchema,
+        StructType dsSchema,
+        int specId,
+        StructLike partition) {
+      this.tableBroadcast = tableBroadcast;
+      this.queryId = queryId;
+      this.format = format;
+      this.targetFileSize = targetFileSize;
+      this.writeSchema = writeSchema;
+      this.dsSchema = dsSchema;
+      this.specId = specId;
+      this.partition = partition;
+    }
+
+    @Override
+    public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
+      Table table = tableBroadcast.value();
+
+      OutputFileFactory deleteFileFactory =
+          OutputFileFactory.builderFor(table, partitionId, taskId)
+              .format(format)
+              .operationId(queryId)
+              .suffix("deletes")
+              .build();
+
+      Schema positionDeleteRowSchema = positionDeleteRowSchema();
+      StructType deleteSparkType = deleteSparkType();
+      StructType deleteSparkTypeWithoutRow = deleteSparkTypeWithoutRow();
+
+      SparkFileWriterFactory writerFactoryWithRow =
+          SparkFileWriterFactory.builderFor(table)
+              .dataSchema(writeSchema)

Review Comment:
   Done



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.ScanTaskSetManager;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.SparkWriteConf;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.Write;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Builder class for rewrites of position delete files from Spark. Responsible for creating {@link
+ * SparkPositionDeletesRewrite}.
+ *
+ * <p>This class is meant to be used for an action to rewrite delete files. Hence, it makes an
+ * assumption that all incoming deletes belong to the same partition, and that incoming dataset is
+ * from {@link ScanTaskSetManager}.
+ */
+public class SparkPositionDeletesRewriteBuilder implements WriteBuilder {
+
+  private final SparkSession spark;
+  private final Table table;
+  private final SparkWriteConf writeConf;
+  private final LogicalWriteInfo writeInfo;
+  private final StructType dsSchema;
+  private final Schema writeSchema;
+
+  SparkPositionDeletesRewriteBuilder(
+      SparkSession spark, Table table, String branch, LogicalWriteInfo info) {
+    this.spark = spark;
+    this.table = table;
+    this.writeConf = new SparkWriteConf(spark, table, branch, info.options());
+    this.writeInfo = info;
+    this.dsSchema = info.schema();
+    this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema, writeConf.caseSensitive());
+  }
+
+  @Override
+  public Write build() {
+    String fileSetId = writeConf.rewrittenFileSetId();
+
+    Preconditions.checkArgument(
+        fileSetId != null, "position_deletes table can only be written by RewriteDeleteFiles");

Review Comment:
   Yep, done



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.ScanTaskSetManager;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.SparkWriteConf;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.Write;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Builder class for rewrites of position delete files from Spark. Responsible for creating {@link
+ * SparkPositionDeletesRewrite}.
+ *
+ * <p>This class is meant to be used for an action to rewrite delete files. Hence, it makes an
+ * assumption that all incoming deletes belong to the same partition, and that incoming dataset is
+ * from {@link ScanTaskSetManager}.
+ */
+public class SparkPositionDeletesRewriteBuilder implements WriteBuilder {
+
+  private final SparkSession spark;
+  private final Table table;
+  private final SparkWriteConf writeConf;
+  private final LogicalWriteInfo writeInfo;
+  private final StructType dsSchema;
+  private final Schema writeSchema;
+
+  SparkPositionDeletesRewriteBuilder(
+      SparkSession spark, Table table, String branch, LogicalWriteInfo info) {
+    this.spark = spark;
+    this.table = table;
+    this.writeConf = new SparkWriteConf(spark, table, branch, info.options());
+    this.writeInfo = info;
+    this.dsSchema = info.schema();
+    this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema, writeConf.caseSensitive());
+  }
+
+  @Override
+  public Write build() {
+    String fileSetId = writeConf.rewrittenFileSetId();
+
+    Preconditions.checkArgument(
+        fileSetId != null, "position_deletes table can only be written by RewriteDeleteFiles");
+    Preconditions.checkArgument(
+        writeConf.handleTimestampWithoutZone()
+            || !SparkUtil.hasTimestampWithoutZone(table.schema()),
+        SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
+
+    // all files of rewrite group have same partition and spec id
+    ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
+    List<PositionDeletesScanTask> tasks = taskSetManager.fetchTasks(table, fileSetId);
+    Preconditions.checkNotNull(tasks, "No scan tasks found for %s", fileSetId);
+    Preconditions.checkArgument(tasks.size() > 0, "No scan tasks found for %s", fileSetId);
+
+    int specId = specId(fileSetId, tasks);
+    StructLike partition = partition(fileSetId, tasks);
+
+    return new SparkPositionDeletesRewrite(
+        spark, table, writeConf, writeInfo, writeSchema, dsSchema, specId, partition);
+  }
+
+  private int specId(String fileSetId, List<PositionDeletesScanTask> tasks) {
+    Set<Integer> specIds = tasks.stream().map(t -> t.spec().specId()).collect(Collectors.toSet());
+    Preconditions.checkArgument(
+        specIds.size() == 1,
+        "All scan tasks of %s are expected to have same spec id, but got %s",
+        fileSetId,
+        Joiner.on(",").join(specIds));
+    return tasks.get(0).spec().specId();
+  }
+
+  private StructLike partition(String fileSetId, List<PositionDeletesScanTask> tasks) {
+    StructLikeSet partitions = StructLikeSet.create(tasks.get(0).spec().partitionType());
+    partitions.addAll(tasks.stream().map(ContentScanTask::partition).collect(Collectors.toList()));
+    Preconditions.checkArgument(
+        partitions.size() == 1,
+        "All scan tasks of %s are expected to have the same partition",

Review Comment:
   Good catch, done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org