You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2021/10/18 10:44:49 UTC

[iceberg] branch master updated: ORC: Add DeleteWriteBuilder for format v2 (#3250)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b920e2  ORC: Add DeleteWriteBuilder for format v2 (#3250)
1b920e2 is described below

commit 1b920e2945d6fc48661735e884e4f1919e9cdf86
Author: pvary <pv...@cloudera.com>
AuthorDate: Mon Oct 18 12:44:35 2021 +0200

    ORC: Add DeleteWriteBuilder for format v2 (#3250)
---
 .../main/java/org/apache/iceberg/avro/Avro.java    |   4 +-
 .../apache/iceberg/data/BaseFileWriterFactory.java |  33 +++-
 .../java/org/apache/iceberg/data/DeleteFilter.java |  12 ++
 .../iceberg/data/orc/TestOrcRowIterator.java       |   8 +-
 .../apache/iceberg/io/TestFileWriterFactory.java   |  18 ++-
 .../org/apache/iceberg/io/TestWriterMetrics.java   |   3 -
 .../apache/iceberg/flink/data/FlinkOrcWriter.java  |   5 +
 .../iceberg/flink/sink/FlinkFileWriterFactory.java |  11 ++
 .../apache/iceberg/data/orc/GenericOrcReader.java  |   0
 .../apache/iceberg/data/orc/GenericOrcReaders.java |   0
 .../apache/iceberg/data/orc/GenericOrcWriter.java  |   5 +
 .../apache/iceberg/data/orc/GenericOrcWriters.java |  37 +++++
 orc/src/main/java/org/apache/iceberg/orc/ORC.java  | 177 ++++++++++++++++++++-
 .../java/org/apache/iceberg/orc/OrcRowWriter.java  |   3 +
 .../apache/iceberg/orc/TestOrcDeleteWriters.java   |  61 +++----
 .../java/org/apache/iceberg/parquet/Parquet.java   |   6 +-
 .../iceberg/parquet/TestParquetDeleteWriters.java  |   3 +
 .../apache/iceberg/spark/data/SparkOrcWriter.java  |   5 +
 .../spark/source/SparkFileWriterFactory.java       |  11 ++
 .../data/TestSparkOrcReadMetadataColumns.java      |   6 +-
 20 files changed, 357 insertions(+), 51 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java
index a1d210c..05c18a2 100644
--- a/core/src/main/java/org/apache/iceberg/avro/Avro.java
+++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java
@@ -459,8 +459,7 @@ public class Avro {
 
     public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
       Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids");
-      Preconditions.checkArgument(spec != null,
-          "Spec must not be null when creating position delete writer");
+      Preconditions.checkArgument(spec != null, "Spec must not be null when creating position delete writer");
       Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
           "Partition must not be null for partitioned writes");
 
@@ -476,6 +475,7 @@ public class Avro {
       } else {
         appenderBuilder.schema(DeleteSchemaUtil.pathPosSchema());
 
+        // We ignore the 'createWriterFunc' and 'rowSchema' even if is provided, since we do not write row data itself
         appenderBuilder.createWriterFunc(ignored -> new PositionDatumWriter());
       }
 
diff --git a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
index 62e09e6..3791d34 100644
--- a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
+++ b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
@@ -77,8 +77,9 @@ public abstract class BaseFileWriterFactory<T> implements FileWriterFactory<T> {
   protected abstract void configureEqualityDelete(Parquet.DeleteWriteBuilder builder);
   protected abstract void configurePositionDelete(Parquet.DeleteWriteBuilder builder);
 
-  // TODO: provide ways to configure ORC delete writers once we support them
   protected abstract void configureDataWrite(ORC.DataWriteBuilder builder);
+  protected abstract void configureEqualityDelete(ORC.DeleteWriteBuilder builder);
+  protected abstract void configurePositionDelete(ORC.DeleteWriteBuilder builder);
 
   @Override
   public DataWriter<T> newDataWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
@@ -184,6 +185,22 @@ public abstract class BaseFileWriterFactory<T> implements FileWriterFactory<T> {
 
           return parquetBuilder.buildEqualityWriter();
 
+        case ORC:
+          ORC.DeleteWriteBuilder orcBuilder = ORC.writeDeletes(outputFile)
+              .setAll(properties)
+              .metricsConfig(metricsConfig)
+              .rowSchema(equalityDeleteRowSchema)
+              .equalityFieldIds(equalityFieldIds)
+              .withSpec(spec)
+              .withPartition(partition)
+              .withKeyMetadata(keyMetadata)
+              .withSortOrder(equalityDeleteSortOrder)
+              .overwrite();
+
+          configureEqualityDelete(orcBuilder);
+
+          return orcBuilder.buildEqualityWriter();
+
         default:
           throw new UnsupportedOperationException("Unsupported format for equality deletes: " + deleteFileFormat);
       }
@@ -230,6 +247,20 @@ public abstract class BaseFileWriterFactory<T> implements FileWriterFactory<T> {
 
           return parquetBuilder.buildPositionWriter();
 
+        case ORC:
+          ORC.DeleteWriteBuilder orcBuilder = ORC.writeDeletes(outputFile)
+              .setAll(properties)
+              .metricsConfig(metricsConfig)
+              .rowSchema(positionDeleteRowSchema)
+              .withSpec(spec)
+              .withPartition(partition)
+              .withKeyMetadata(keyMetadata)
+              .overwrite();
+
+          configurePositionDelete(orcBuilder);
+
+          return orcBuilder.buildPositionWriter();
+
         default:
           throw new UnsupportedOperationException("Unsupported format for position deletes: " + deleteFileFormat);
       }
diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
index 148f9d9..40a77a1 100644
--- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
@@ -34,11 +34,13 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
 import org.apache.iceberg.data.parquet.GenericParquetReaders;
 import org.apache.iceberg.deletes.Deletes;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -227,6 +229,16 @@ public abstract class DeleteFilter<T> {
         return builder.build();
 
       case ORC:
+        // Reusing containers is automatic for ORC. No need to set 'reuseContainers' here.
+        ORC.ReadBuilder orcBuilder = ORC.read(input)
+            .project(deleteSchema)
+            .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema));
+
+        if (deleteFile.content() == FileContent.POSITION_DELETES) {
+          orcBuilder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), dataFile.path()));
+        }
+
+        return orcBuilder.build();
       default:
         throw new UnsupportedOperationException(String.format(
             "Cannot read deletes, %s is not a supported format: %s", deleteFile.format().name(), deleteFile.path()));
diff --git a/data/src/test/java/org/apache/iceberg/data/orc/TestOrcRowIterator.java b/data/src/test/java/org/apache/iceberg/data/orc/TestOrcRowIterator.java
index cde3f1e..ab646b2 100644
--- a/data/src/test/java/org/apache/iceberg/data/orc/TestOrcRowIterator.java
+++ b/data/src/test/java/org/apache/iceberg/data/orc/TestOrcRowIterator.java
@@ -76,10 +76,10 @@ public class TestOrcRowIterator {
         .createWriterFunc(GenericOrcWriter::buildWriter)
         .schema(DATA_SCHEMA)
         // write in such a way that the file contains 2 stripes each with 4 row groups of 1000 rows
-        .config("iceberg.orc.vectorbatch.size", "1000")
-        .config(OrcConf.ROW_INDEX_STRIDE.getAttribute(), "1000")
-        .config(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), "4000")
-        .config(OrcConf.STRIPE_SIZE.getAttribute(), "1")
+        .set("iceberg.orc.vectorbatch.size", "1000")
+        .set(OrcConf.ROW_INDEX_STRIDE.getAttribute(), "1000")
+        .set(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), "4000")
+        .set(OrcConf.STRIPE_SIZE.getAttribute(), "1")
         .build()) {
       writer.addAll(DATA_ROWS);
     }
diff --git a/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java
index c3c4691..786b542 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java
@@ -33,11 +33,13 @@ import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
 import org.apache.iceberg.data.parquet.GenericParquetReaders;
 import org.apache.iceberg.deletes.EqualityDeleteWriter;
 import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.deletes.PositionDeleteWriter;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -131,8 +133,6 @@ public abstract class TestFileWriterFactory<T> extends WriterTestBase<T> {
 
   @Test
   public void testEqualityDeleteWriter() throws IOException {
-    Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
-
     List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
     Schema equalityDeleteRowSchema = table.schema().select("id");
     FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
@@ -179,7 +179,6 @@ public abstract class TestFileWriterFactory<T> extends WriterTestBase<T> {
 
   @Test
   public void testEqualityDeleteWriterWithMultipleSpecs() throws IOException {
-    Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
     Assume.assumeFalse("Table must start unpartitioned", partitioned);
 
     List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
@@ -242,8 +241,6 @@ public abstract class TestFileWriterFactory<T> extends WriterTestBase<T> {
 
   @Test
   public void testPositionDeleteWriter() throws IOException {
-    Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
-
     FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
 
     // write a data file
@@ -288,8 +285,6 @@ public abstract class TestFileWriterFactory<T> extends WriterTestBase<T> {
 
   @Test
   public void testPositionDeleteWriterWithRow() throws IOException {
-    Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
-
     FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), table.schema());
 
     // write a data file
@@ -400,6 +395,15 @@ public abstract class TestFileWriterFactory<T> extends WriterTestBase<T> {
           return ImmutableList.copyOf(records);
         }
 
+      case ORC:
+        try (CloseableIterable<Record> records = ORC.read(inputFile)
+            .project(schema)
+            .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema))
+            .build()) {
+
+          return ImmutableList.copyOf(records);
+        }
+
       default:
         throw new UnsupportedOperationException("Unsupported read file format: " + fileFormat);
     }
diff --git a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
index 5909731..f52e422 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
@@ -41,7 +41,6 @@ import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Types;
 import org.junit.After;
 import org.junit.Assert;
-import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -157,8 +156,6 @@ public abstract class TestWriterMetrics<T> {
 
   @Test
   public void testPositionDeleteMetrics() throws IOException {
-    Assume.assumeTrue(fileFormat == FileFormat.PARQUET);
-
     FileWriterFactory<T> writerFactory = newWriterFactory(SCHEMA);
     EncryptedOutputFile outputFile = fileFactory.newOutputFile();
     PositionDeleteWriter<T> deleteWriter = writerFactory.newPositionDeleteWriter(outputFile, table.spec(), null);
diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
index 3f469b7..2eeb268 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
@@ -54,6 +54,11 @@ public class FlinkOrcWriter implements OrcRowWriter<RowData> {
   }
 
   @Override
+  public List<OrcValueWriter<?>> writers() {
+    return writer.writers();
+  }
+
+  @Override
   public Stream<FieldMetrics<?>> metrics() {
     return writer.metrics();
   }
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
index 6ce2542..55a9539 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
@@ -109,6 +109,17 @@ class FlinkFileWriterFactory extends BaseFileWriterFactory<RowData> implements S
     builder.createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(dataFlinkType(), iSchema));
   }
 
+  @Override
+  protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
+    builder.createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(equalityDeleteFlinkType(), iSchema));
+  }
+
+  @Override
+  protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
+    builder.createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(positionDeleteFlinkType(), iSchema));
+    builder.transformPaths(path -> StringData.fromString(path.toString()));
+  }
+
   private RowType dataFlinkType() {
     if (dataFlinkType == null) {
       Preconditions.checkNotNull(dataSchema(), "Data schema must not be null");
diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java b/orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
similarity index 100%
rename from data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
rename to orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java b/orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java
similarity index 100%
rename from data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java
rename to orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java
diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
similarity index 98%
rename from data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
rename to orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
index 4e0cb77..a136085 100644
--- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
+++ b/orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
@@ -119,6 +119,11 @@ public class GenericOrcWriter implements OrcRowWriter<Record> {
   }
 
   @Override
+  public List<OrcValueWriter<?>> writers() {
+    return writer.writers();
+  }
+
+  @Override
   public Stream<FieldMetrics<?>> metrics() {
     return writer.metrics();
   }
diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
similarity index 92%
rename from data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
rename to orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
index e0d2c5a..5038d13 100644
--- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
+++ b/orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg.data.orc;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
@@ -37,6 +38,8 @@ import java.util.stream.Stream;
 import org.apache.iceberg.DoubleFieldMetrics;
 import org.apache.iceberg.FieldMetrics;
 import org.apache.iceberg.FloatFieldMetrics;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.orc.OrcRowWriter;
 import org.apache.iceberg.orc.OrcValueWriter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -138,6 +141,11 @@ public class GenericOrcWriters {
     return new MapWriter<>(key, value);
   }
 
+  public static <T> OrcRowWriter<PositionDelete<T>> positionDelete(OrcRowWriter<T> writer,
+      Function<CharSequence, ?> pathTransformFunc) {
+    return new PositionDeleteStructWriter<>(writer, pathTransformFunc);
+  }
+
   private static class BooleanWriter implements OrcValueWriter<Boolean> {
     private static final OrcValueWriter<Boolean> INSTANCE = new BooleanWriter();
 
@@ -479,6 +487,35 @@ public class GenericOrcWriters {
     protected abstract Object get(S struct, int index);
   }
 
+  private static class PositionDeleteStructWriter<T> extends StructWriter<PositionDelete<T>>
+      implements OrcRowWriter<PositionDelete<T>> {
+    private final Function<CharSequence, ?> pathTransformFunc;
+
+    PositionDeleteStructWriter(OrcRowWriter<T> replacedWriter, Function<CharSequence, ?> pathTransformFunc) {
+      super(replacedWriter.writers());
+      this.pathTransformFunc = pathTransformFunc;
+    }
+
+    @Override
+    protected Object get(PositionDelete<T> delete, int index) {
+      switch (index) {
+        case 0:
+          return pathTransformFunc.apply(delete.path());
+        case 1:
+          return delete.pos();
+        case 2:
+          return delete.row();
+      }
+      throw new IllegalArgumentException("Cannot get value for invalid index: " + index);
+    }
+
+    @Override
+    public void write(PositionDelete<T> row, VectorizedRowBatch output) throws IOException {
+      Preconditions.checkArgument(row != null, "value must not be null");
+      writeRow(row, output);
+    }
+  }
+
   private static void growColumnVector(ColumnVector cv, int requestedSize) {
     if (cv.isNull.length < requestedSize) {
       // Use growth factor of 3 to avoid frequent array allocations
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
index b326c2f..b4936b1 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
@@ -22,9 +22,13 @@ package org.apache.iceberg.orc;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.function.BiFunction;
 import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.FileFormat;
@@ -34,6 +38,10 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.orc.GenericOrcWriters;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
 import org.apache.iceberg.encryption.EncryptionKeyMetadata;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Expression;
@@ -41,11 +49,13 @@ import org.apache.iceberg.hadoop.HadoopInputFile;
 import org.apache.iceberg.hadoop.HadoopOutputFile;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.DeleteSchemaUtil;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.ArrayUtil;
 import org.apache.orc.OrcConf;
 import org.apache.orc.OrcFile;
 import org.apache.orc.OrcFile.ReaderOptions;
@@ -87,7 +97,19 @@ public class ORC {
       return this;
     }
 
+    /**
+     * Setting a specific configuration value for the writer.
+     * @param property The property to set
+     * @param value The value to set
+     * @return The resulting builder for chaining purposes
+     * @deprecated Please use #set(String, String) instead
+     */
+    @Deprecated
     public WriteBuilder config(String property, String value) {
+      return set(property, value);
+    }
+
+    public WriteBuilder set(String property, String value) {
       conf.set(property, value);
       return this;
     }
@@ -160,7 +182,7 @@ public class ORC {
     }
 
     public DataWriteBuilder set(String property, String value) {
-      appenderBuilder.config(property, value);
+      appenderBuilder.set(property, value);
       return this;
     }
 
@@ -223,6 +245,159 @@ public class ORC {
     }
   }
 
+  public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+    return new DeleteWriteBuilder(file);
+  }
+
+  public static class DeleteWriteBuilder {
+    private final WriteBuilder appenderBuilder;
+    private final String location;
+    private BiFunction<Schema, TypeDescription, OrcRowWriter<?>> createWriterFunc = null;
+    private Schema rowSchema = null;
+    private PartitionSpec spec = null;
+    private StructLike partition = null;
+    private EncryptionKeyMetadata keyMetadata = null;
+    private int[] equalityFieldIds = null;
+    private SortOrder sortOrder;
+    private Function<CharSequence, ?> pathTransformFunc = Function.identity();
+
+    private DeleteWriteBuilder(OutputFile file) {
+      this.appenderBuilder = write(file);
+      this.location = file.location();
+    }
+
+    public DeleteWriteBuilder forTable(Table table) {
+      rowSchema(table.schema());
+      withSpec(table.spec());
+      setAll(table.properties());
+      metricsConfig(MetricsConfig.forTable(table));
+      return this;
+    }
+
+    public DeleteWriteBuilder set(String property, String value) {
+      appenderBuilder.set(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder setAll(Map<String, String> properties) {
+      appenderBuilder.setAll(properties);
+      return this;
+    }
+
+    public DeleteWriteBuilder meta(String property, String value) {
+      appenderBuilder.metadata(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public DeleteWriteBuilder overwrite(boolean enabled) {
+      appenderBuilder.overwrite(enabled);
+      return this;
+    }
+
+    public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
+      appenderBuilder.metricsConfig(newMetricsConfig);
+      return this;
+    }
+
+    public DeleteWriteBuilder createWriterFunc(BiFunction<Schema, TypeDescription, OrcRowWriter<?>> newWriterFunc) {
+      this.createWriterFunc = newWriterFunc;
+      return this;
+    }
+
+    public DeleteWriteBuilder rowSchema(Schema newSchema) {
+      this.rowSchema = newSchema;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
+      this.spec = newSpec;
+      return this;
+    }
+
+    public DeleteWriteBuilder withPartition(StructLike key) {
+      this.partition = key;
+      return this;
+    }
+
+    public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+      this.keyMetadata = metadata;
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
+      this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds);
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
+      this.equalityFieldIds = fieldIds;
+      return this;
+    }
+
+    public DeleteWriteBuilder transformPaths(Function<CharSequence, ?> newPathTransformFunc) {
+      this.pathTransformFunc = newPathTransformFunc;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) {
+      this.sortOrder = newSortOrder;
+      return this;
+    }
+
+    public <T> EqualityDeleteWriter<T> buildEqualityWriter() {
+      Preconditions.checkState(rowSchema != null, "Cannot create equality delete file without a schema");
+      Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids");
+      Preconditions.checkState(createWriterFunc != null,
+          "Cannot create equality delete file unless createWriterFunc is set");
+      Preconditions.checkArgument(spec != null, "Spec must not be null when creating equality delete writer");
+      Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+          "Partition must not be null for partitioned writes");
+
+      meta("delete-type", "equality");
+      meta("delete-field-ids", IntStream.of(equalityFieldIds)
+          .mapToObj(Objects::toString)
+          .collect(Collectors.joining(", ")));
+
+      // the appender uses the row schema without extra columns
+      appenderBuilder.schema(rowSchema);
+      appenderBuilder.createWriterFunc(createWriterFunc);
+
+      return new EqualityDeleteWriter<>(
+          appenderBuilder.build(), FileFormat.ORC, location, spec, partition, keyMetadata,
+          sortOrder, equalityFieldIds);
+    }
+
+    public <T> PositionDeleteWriter<T> buildPositionWriter() {
+      Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids");
+      Preconditions.checkArgument(spec != null, "Spec must not be null when creating position delete writer");
+      Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+          "Partition must not be null for partitioned writes");
+
+      meta("delete-type", "position");
+
+      if (rowSchema != null && createWriterFunc != null) {
+        Schema deleteSchema = DeleteSchemaUtil.posDeleteSchema(rowSchema);
+        appenderBuilder.schema(deleteSchema);
+
+        appenderBuilder.createWriterFunc((schema, typeDescription) ->
+            GenericOrcWriters.positionDelete(createWriterFunc.apply(deleteSchema, typeDescription), pathTransformFunc));
+      } else {
+        appenderBuilder.schema(DeleteSchemaUtil.pathPosSchema());
+
+        // We ignore the 'createWriterFunc' and 'rowSchema' even if is provided, since we do not write row data itself
+        appenderBuilder.createWriterFunc((schema, typeDescription) -> GenericOrcWriters.positionDelete(
+                GenericOrcWriter.buildWriter(schema, typeDescription), Function.identity()));
+      }
+
+      return new PositionDeleteWriter<>(
+          appenderBuilder.build(), FileFormat.ORC, location, spec, partition, keyMetadata);
+    }
+  }
+
   public static ReadBuilder read(InputFile file) {
     return new ReadBuilder(file);
   }
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java b/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java
index 1f0ea13..0487b9c 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.orc;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.stream.Stream;
 import org.apache.iceberg.FieldMetrics;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
@@ -38,6 +39,8 @@ public interface OrcRowWriter<T> {
    */
   void write(T row, VectorizedRowBatch output) throws IOException;
 
+  List<OrcValueWriter<?>> writers();
+
   /**
    * Returns a stream of {@link FieldMetrics} that this OrcRowWriter keeps track of.
    */
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDeleteWriters.java b/orc/src/test/java/org/apache/iceberg/orc/TestOrcDeleteWriters.java
similarity index 75%
copy from parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDeleteWriters.java
copy to orc/src/test/java/org/apache/iceberg/orc/TestOrcDeleteWriters.java
index 77ceec1..5de06a0 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDeleteWriters.java
+++ b/orc/src/test/java/org/apache/iceberg/orc/TestOrcDeleteWriters.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iceberg.parquet;
+package org.apache.iceberg.orc;
 
 import java.io.File;
 import java.io.IOException;
@@ -31,9 +31,10 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.data.parquet.GenericParquetReaders;
-import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
 import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.deletes.PositionDeleteWriter;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.OutputFile;
@@ -41,17 +42,16 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
-import org.apache.iceberg.types.Types.NestedField;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-public class TestParquetDeleteWriters {
+public class TestOrcDeleteWriters {
   private static final Schema SCHEMA = new Schema(
-      NestedField.required(1, "id", Types.LongType.get()),
-      NestedField.optional(2, "data", Types.StringType.get()));
+      Types.NestedField.required(1, "id", Types.LongType.get()),
+      Types.NestedField.optional(2, "data", Types.StringType.get()));
 
   private List<Record> records;
 
@@ -77,8 +77,8 @@ public class TestParquetDeleteWriters {
     File deleteFile = temp.newFile();
 
     OutputFile out = Files.localOutput(deleteFile);
-    EqualityDeleteWriter<Record> deleteWriter = Parquet.writeDeletes(out)
-        .createWriterFunc(GenericParquetWriter::buildWriter)
+    EqualityDeleteWriter<Record> deleteWriter = ORC.writeDeletes(out)
+        .createWriterFunc(GenericOrcWriter::buildWriter)
         .overwrite()
         .rowSchema(SCHEMA)
         .withSpec(PartitionSpec.unpartitioned())
@@ -90,16 +90,16 @@ public class TestParquetDeleteWriters {
     }
 
     DeleteFile metadata = deleteWriter.toDeleteFile();
-    Assert.assertEquals("Format should be Parquet", FileFormat.PARQUET, metadata.format());
+    Assert.assertEquals("Format should be ORC", FileFormat.ORC, metadata.format());
     Assert.assertEquals("Should be equality deletes", FileContent.EQUALITY_DELETES, metadata.content());
     Assert.assertEquals("Record count should be correct", records.size(), metadata.recordCount());
     Assert.assertEquals("Partition should be empty", 0, metadata.partition().size());
     Assert.assertNull("Key metadata should be null", metadata.keyMetadata());
 
     List<Record> deletedRecords;
-    try (CloseableIterable<Record> reader = Parquet.read(out.toInputFile())
+    try (CloseableIterable<Record> reader = ORC.read(out.toInputFile())
         .project(SCHEMA)
-        .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(SCHEMA, fileSchema))
+        .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(SCHEMA, fileSchema))
         .build()) {
       deletedRecords = Lists.newArrayList(reader);
     }
@@ -114,24 +114,26 @@ public class TestParquetDeleteWriters {
     Schema deleteSchema = new Schema(
         MetadataColumns.DELETE_FILE_PATH,
         MetadataColumns.DELETE_FILE_POS,
-        NestedField.optional(MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", SCHEMA.asStruct()));
+        Types.NestedField.optional(MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", SCHEMA.asStruct()));
 
-    String deletePath = "s3://bucket/path/file.parquet";
+    String deletePath = "s3://bucket/path/file.orc";
     GenericRecord posDelete = GenericRecord.create(deleteSchema);
     List<Record> expectedDeleteRecords = Lists.newArrayList();
 
     OutputFile out = Files.localOutput(deleteFile);
-    PositionDeleteWriter<Record> deleteWriter = Parquet.writeDeletes(out)
-        .createWriterFunc(GenericParquetWriter::buildWriter)
+    PositionDeleteWriter<Record> deleteWriter = ORC.writeDeletes(out)
+        .createWriterFunc(GenericOrcWriter::buildWriter)
         .overwrite()
         .rowSchema(SCHEMA)
         .withSpec(PartitionSpec.unpartitioned())
         .buildPositionWriter();
 
+    PositionDelete<Record> positionDelete = PositionDelete.create();
     try (PositionDeleteWriter<Record> writer = deleteWriter) {
       for (int i = 0; i < records.size(); i += 1) {
         int pos = i * 3 + 2;
-        writer.delete(deletePath, pos, records.get(i));
+        positionDelete.set(deletePath, pos, records.get(i));
+        writer.write(positionDelete);
         expectedDeleteRecords.add(posDelete.copy(ImmutableMap.of(
             "file_path", deletePath,
             "pos", (long) pos,
@@ -140,16 +142,16 @@ public class TestParquetDeleteWriters {
     }
 
     DeleteFile metadata = deleteWriter.toDeleteFile();
-    Assert.assertEquals("Format should be Parquet", FileFormat.PARQUET, metadata.format());
+    Assert.assertEquals("Format should be ORC", FileFormat.ORC, metadata.format());
     Assert.assertEquals("Should be position deletes", FileContent.POSITION_DELETES, metadata.content());
     Assert.assertEquals("Record count should be correct", records.size(), metadata.recordCount());
     Assert.assertEquals("Partition should be empty", 0, metadata.partition().size());
     Assert.assertNull("Key metadata should be null", metadata.keyMetadata());
 
     List<Record> deletedRecords;
-    try (CloseableIterable<Record> reader = Parquet.read(out.toInputFile())
+    try (CloseableIterable<Record> reader = ORC.read(out.toInputFile())
         .project(deleteSchema)
-        .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema))
+        .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema))
         .build()) {
       deletedRecords = Lists.newArrayList(reader);
     }
@@ -165,21 +167,26 @@ public class TestParquetDeleteWriters {
         MetadataColumns.DELETE_FILE_PATH,
         MetadataColumns.DELETE_FILE_POS);
 
-    String deletePath = "s3://bucket/path/file.parquet";
+    String deletePath = "s3://bucket/path/file.orc";
     GenericRecord posDelete = GenericRecord.create(deleteSchema);
     List<Record> expectedDeleteRecords = Lists.newArrayList();
 
     OutputFile out = Files.localOutput(deleteFile);
-    PositionDeleteWriter<Void> deleteWriter = Parquet.writeDeletes(out)
-        .createWriterFunc(GenericParquetWriter::buildWriter)
+    PositionDeleteWriter<Void> deleteWriter = ORC.writeDeletes(out)
+        .createWriterFunc(GenericOrcWriter::buildWriter)
         .overwrite()
         .withSpec(PartitionSpec.unpartitioned())
+        .transformPaths(path -> {
+          throw new RuntimeException("Should not be called for performance reasons");
+        })
         .buildPositionWriter();
 
+    PositionDelete<Void> positionDelete = PositionDelete.create();
     try (PositionDeleteWriter<Void> writer = deleteWriter) {
       for (int i = 0; i < records.size(); i += 1) {
         int pos = i * 3 + 2;
-        writer.delete(deletePath, pos, null);
+        positionDelete.set(deletePath, pos, null);
+        writer.write(positionDelete);
         expectedDeleteRecords.add(posDelete.copy(ImmutableMap.of(
             "file_path", deletePath,
             "pos", (long) pos)));
@@ -187,16 +194,16 @@ public class TestParquetDeleteWriters {
     }
 
     DeleteFile metadata = deleteWriter.toDeleteFile();
-    Assert.assertEquals("Format should be Parquet", FileFormat.PARQUET, metadata.format());
+    Assert.assertEquals("Format should be ORC", FileFormat.ORC, metadata.format());
     Assert.assertEquals("Should be position deletes", FileContent.POSITION_DELETES, metadata.content());
     Assert.assertEquals("Record count should be correct", records.size(), metadata.recordCount());
     Assert.assertEquals("Partition should be empty", 0, metadata.partition().size());
     Assert.assertNull("Key metadata should be null", metadata.keyMetadata());
 
     List<Record> deletedRecords;
-    try (CloseableIterable<Record> reader = Parquet.read(out.toInputFile())
+    try (CloseableIterable<Record> reader = ORC.read(out.toInputFile())
         .project(deleteSchema)
-        .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema))
+        .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema))
         .build()) {
       deletedRecords = Lists.newArrayList(reader);
     }
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 8d42fd5..316a363 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -568,7 +568,7 @@ public class Parquet {
     }
 
     public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws IOException {
-      Preconditions.checkState(rowSchema != null, "Cannot create equality delete file without a schema`");
+      Preconditions.checkState(rowSchema != null, "Cannot create equality delete file without a schema");
       Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids");
       Preconditions.checkState(createWriterFunc != null,
           "Cannot create equality delete file unless createWriterFunc is set");
@@ -594,8 +594,7 @@ public class Parquet {
 
     public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
       Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids");
-      Preconditions.checkArgument(spec != null,
-          "Spec must not be null when creating position delete writer");
+      Preconditions.checkArgument(spec != null, "Spec must not be null when creating position delete writer");
       Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
           "Partition must not be null for partitioned writes");
 
@@ -617,6 +616,7 @@ public class Parquet {
       } else {
         appenderBuilder.schema(DeleteSchemaUtil.pathPosSchema());
 
+        // We ignore the 'createWriterFunc' and 'rowSchema' even if is provided, since we do not write row data itself
         appenderBuilder.createWriterFunc(parquetSchema ->
             new PositionDeleteStructWriter<T>((StructWriter<?>) GenericParquetWriter.buildWriter(parquetSchema),
                 Function.identity()));
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDeleteWriters.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDeleteWriters.java
index 77ceec1..9af0719 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDeleteWriters.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDeleteWriters.java
@@ -174,6 +174,9 @@ public class TestParquetDeleteWriters {
         .createWriterFunc(GenericParquetWriter::buildWriter)
         .overwrite()
         .withSpec(PartitionSpec.unpartitioned())
+        .transformPaths(path -> {
+          throw new RuntimeException("Should not be called for performance reasons");
+        })
         .buildPositionWriter();
 
     try (PositionDeleteWriter<Void> writer = deleteWriter) {
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
index 34292f2..9c7f3a6 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
@@ -61,6 +61,11 @@ public class SparkOrcWriter implements OrcRowWriter<InternalRow> {
   }
 
   @Override
+  public List<OrcValueWriter<?>> writers() {
+    return writer.writers();
+  }
+
+  @Override
   public Stream<FieldMetrics<?>> metrics() {
     return writer.metrics();
   }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
index daff3b9..beaa7c2 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
@@ -110,6 +110,17 @@ class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> {
     builder.createWriterFunc(SparkOrcWriter::new);
   }
 
+  @Override
+  protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
+    builder.createWriterFunc(SparkOrcWriter::new);
+  }
+
+  @Override
+  protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
+    builder.createWriterFunc(SparkOrcWriter::new);
+    builder.transformPaths(path -> UTF8String.fromString(path.toString()));
+  }
+
   private StructType dataSparkType() {
     if (dataSparkType == null) {
       Preconditions.checkNotNull(dataSchema(), "Data schema must not be null");
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
index 010f9f5..b8ee563 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
@@ -121,9 +121,9 @@ public class TestSparkOrcReadMetadataColumns {
         .createWriterFunc(SparkOrcWriter::new)
         .schema(DATA_SCHEMA)
         // write in such a way that the file contains 10 stripes each with 100 rows
-        .config("iceberg.orc.vectorbatch.size", "100")
-        .config(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), "100")
-        .config(OrcConf.STRIPE_SIZE.getAttribute(), "1")
+        .set("iceberg.orc.vectorbatch.size", "100")
+        .set(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), "100")
+        .set(OrcConf.STRIPE_SIZE.getAttribute(), "1")
         .build()) {
       writer.addAll(DATA_ROWS);
     }