You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2022/05/12 13:53:24 UTC

[hive] branch master updated: HIVE-26202: Refactor Iceberg Writers (Peter Vary reviewed by Laszlo Pinter) (#3269)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a1906b9f00 HIVE-26202: Refactor Iceberg Writers (Peter Vary reviewed by Laszlo Pinter) (#3269)
a1906b9f00 is described below

commit a1906b9f00a2ac182d10951cbc5e4c30b40aadc9
Author: pvary <pv...@cloudera.com>
AuthorDate: Thu May 12 15:53:16 2022 +0200

    HIVE-26202: Refactor Iceberg Writers (Peter Vary reviewed by Laszlo Pinter) (#3269)
---
 .../mr/hive/HiveIcebergOutputCommitter.java        |  12 +-
 .../iceberg/mr/hive/HiveIcebergOutputFormat.java   |  51 +++-----
 .../apache/iceberg/mr/hive/HiveIcebergSerDe.java   |  22 ++--
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |  18 +--
 .../hive/{ => writer}/HiveFileWriterFactory.java   |   6 +-
 .../HiveIcebergBufferedDeleteWriter.java           |  30 +++--
 .../hive/{ => writer}/HiveIcebergDeleteWriter.java |  17 +--
 .../hive/{ => writer}/HiveIcebergRecordWriter.java |  15 ++-
 .../hive/{ => writer}/HiveIcebergUpdateWriter.java |  32 ++---
 .../mr/hive/{ => writer}/HiveIcebergWriter.java    |   9 +-
 .../hive/{ => writer}/HiveIcebergWriterBase.java   |  39 +-----
 .../iceberg/mr/hive/writer/WriterBuilder.java      | 133 +++++++++++++++++++++
 .../iceberg/mr/hive/writer/WriterRegistry.java     |  44 +++++++
 .../iceberg/mr/mapreduce/IcebergInputFormat.java   |  34 ++++--
 .../mr/hive/TestHiveIcebergOutputCommitter.java    |  37 ++----
 .../{ => writer}/HiveIcebergWriterTestBase.java    |  14 ++-
 .../{ => writer}/TestHiveIcebergDeleteWriter.java  |  27 +----
 .../{ => writer}/TestHiveIcebergUpdateWriter.java  |  37 ++----
 18 files changed, 340 insertions(+), 237 deletions(-)

diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index 0ea882a450..825ee3dc39 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -61,6 +61,8 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.hive.writer.HiveIcebergWriter;
+import org.apache.iceberg.mr.hive.writer.WriterRegistry;
 import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -107,7 +109,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
     TaskAttemptID attemptID = context.getTaskAttemptID();
     JobConf jobConf = context.getJobConf();
     Collection<String> outputs = HiveIcebergStorageHandler.outputTables(context.getJobConf());
-    Map<String, HiveIcebergWriter> writers = Optional.ofNullable(HiveIcebergWriterBase.getWriters(attemptID))
+    Map<String, HiveIcebergWriter> writers = Optional.ofNullable(WriterRegistry.writers(attemptID))
         .orElseGet(() -> {
           LOG.info("CommitTask found no writers for output tables: {}, attemptID: {}", outputs, attemptID);
           return ImmutableMap.of();
@@ -146,7 +148,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
     }
 
     // remove the writer to release the object
-    HiveIcebergWriterBase.removeWriters(attemptID);
+    WriterRegistry.removeWriters(attemptID);
   }
 
   /**
@@ -159,7 +161,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
     TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext);
 
     // Clean up writer data from the local store
-    Map<String, HiveIcebergWriter> writers = HiveIcebergWriterBase.removeWriters(context.getTaskAttemptID());
+    Map<String, HiveIcebergWriter> writers = WriterRegistry.removeWriters(context.getTaskAttemptID());
 
     // Remove files if it was not done already
     if (writers != null) {
@@ -335,8 +337,8 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
     if (!conf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) {
       if (writeResults.isEmpty()) {
         LOG.info(
-            "Not creating a new commit for table: {}, jobID: {}, isDelete: {}, since there were no new files to add",
-            table, jobContext.getJobID(), HiveIcebergStorageHandler.isDelete(conf, name));
+            "Not creating a new commit for table: {}, jobID: {}, operation: {}, since there were no new files to add",
+            table, jobContext.getJobID(), HiveIcebergStorageHandler.operation(conf, name));
       } else {
         commitWrite(table, startTime, writeResults);
       }
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index 552bffa236..7bf1b1ec0d 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -19,7 +19,6 @@
 
 package org.apache.iceberg.mr.hive;
 
-import java.util.Locale;
 import java.util.Properties;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -32,19 +31,17 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.util.Progressable;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.hive.writer.HiveIcebergWriter;
+import org.apache.iceberg.mr.hive.writer.WriterBuilder;
 import org.apache.iceberg.mr.mapred.Container;
-import org.apache.iceberg.util.PropertyUtil;
 
 public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Container<Record>>,
     HiveOutputFormat<NullWritable, Container<Record>> {
+  private static final String DELETE_FILE_THREAD_POOL_SIZE = "iceberg.delete.file.thread.pool.size";
+  private static final int DELETE_FILE_THREAD_POOL_SIZE_DEFAULT = 10;
 
   @Override
   public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, Class valueClass,
@@ -63,39 +60,19 @@ public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Co
     // Not doing any check.
   }
 
-  private static HiveIcebergWriterBase writer(JobConf jc) {
+  private static HiveIcebergWriter writer(JobConf jc) {
     TaskAttemptID taskAttemptID = TezUtil.taskAttemptWrapper(jc);
     // It gets the config from the FileSinkOperator which has its own config for every target table
     Table table = HiveIcebergStorageHandler.table(jc, jc.get(hive_metastoreConstants.META_TABLE_NAME));
-    Schema schema = HiveIcebergStorageHandler.schema(jc);
-    FileFormat fileFormat = FileFormat.valueOf(PropertyUtil.propertyAsString(table.properties(),
-        TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT).toUpperCase(Locale.ENGLISH));
-    long targetFileSize = PropertyUtil.propertyAsLong(table.properties(), TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
-        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
-    FileIO io = table.io();
-    int partitionId = taskAttemptID.getTaskID().getId();
-    int taskId = taskAttemptID.getId();
-    String operationId = jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + taskAttemptID.getJobID();
-    OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId)
-        .format(fileFormat)
-        .operationId(operationId)
-        .build();
     String tableName = jc.get(Catalogs.NAME);
-    if (HiveIcebergStorageHandler.isDelete(jc, tableName)) {
-      HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, fileFormat, schema, null, fileFormat,
-          null, null, null, schema);
-      return new HiveIcebergDeleteWriter(schema, table.specs(), fileFormat, writerFactory, outputFileFactory, io,
-          targetFileSize, taskAttemptID, tableName);
-    } else if (HiveIcebergStorageHandler.isUpdate(jc, tableName)) {
-      HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, fileFormat, schema, null, fileFormat,
-          null, null, null, null);
-      return new HiveIcebergUpdateWriter(schema, table.specs(), table.spec().specId(), fileFormat, writerFactory,
-          outputFileFactory, io, targetFileSize, taskAttemptID, tableName, jc);
-    } else {
-      HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, fileFormat, schema, null, fileFormat,
-          null, null, null, schema);
-      return new HiveIcebergRecordWriter(schema, table.specs(), table.spec().specId(), fileFormat, writerFactory,
-          outputFileFactory, io, targetFileSize, taskAttemptID, tableName, false);
-    }
+    int poolSize = jc.getInt(DELETE_FILE_THREAD_POOL_SIZE, DELETE_FILE_THREAD_POOL_SIZE_DEFAULT);
+
+    return WriterBuilder.builderFor(table)
+        .queryId(jc.get(HiveConf.ConfVars.HIVEQUERYID.varname))
+        .tableName(tableName)
+        .attemptID(taskAttemptID)
+        .poolSize(poolSize)
+        .operation(HiveIcebergStorageHandler.operation(jc, tableName))
+        .build();
   }
 }
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index a4470058c7..99820583a1 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.session.SessionStateUtil;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
@@ -144,15 +145,18 @@ public class HiveIcebergSerDe extends AbstractSerDe {
   }
 
   private static Schema projectedSchema(Configuration configuration, String tableName, Schema tableSchema) {
-    if (HiveIcebergStorageHandler.isDelete(configuration, tableName)) {
-      // when writing delete files, we should use the full delete schema
-      return IcebergAcidUtil.createSerdeSchemaForDelete(tableSchema.columns());
-    } else if (HiveIcebergStorageHandler.isUpdate(configuration, tableName)) {
-      // when writing delete files, we should use the full delete schema
-      return IcebergAcidUtil.createSerdeSchemaForUpdate(tableSchema.columns());
-    } else if (HiveIcebergStorageHandler.isWrite(configuration, tableName)) {
-      // when writing out data, we should not do projection push down
-      return tableSchema;
+    Context.Operation operation = HiveIcebergStorageHandler.operation(configuration, tableName);
+    if (operation != null) {
+      switch (operation) {
+        case DELETE:
+          return IcebergAcidUtil.createSerdeSchemaForDelete(tableSchema.columns());
+        case UPDATE:
+          return IcebergAcidUtil.createSerdeSchemaForUpdate(tableSchema.columns());
+        case OTHER:
+          return tableSchema;
+        default:
+          throw new IllegalArgumentException("Unsupported operation " + operation);
+      }
     } else {
       configuration.setBoolean(InputFormatConfig.CASE_SENSITIVE, false);
       String[] selectedColumns = ColumnProjectionUtils.getReadColumnNames(configuration);
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 39042691e8..68bd647521 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -556,19 +556,13 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
     return false;
   }
 
-  public static boolean isWrite(Configuration conf, String tableName) {
-    return conf != null && tableName != null && Operation.OTHER.name().equals(
-        conf.get(InputFormatConfig.OPERATION_TYPE_PREFIX + tableName));
-  }
-
-  public static boolean isDelete(Configuration conf, String tableName) {
-    return conf != null && tableName != null && Operation.DELETE.name().equals(
-        conf.get(InputFormatConfig.OPERATION_TYPE_PREFIX + tableName));
-  }
+  public static Operation operation(Configuration conf, String tableName) {
+    if (conf == null || tableName == null) {
+      return null;
+    }
 
-  public static boolean isUpdate(Configuration conf, String tableName) {
-    return conf != null && tableName != null && Operation.UPDATE.name().equals(
-        conf.get(InputFormatConfig.OPERATION_TYPE_PREFIX + tableName));
+    String operation = conf.get(InputFormatConfig.OPERATION_TYPE_PREFIX + tableName);
+    return operation == null ? null : Operation.valueOf(operation);
   }
 
   /**
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveFileWriterFactory.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
similarity index 95%
rename from iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveFileWriterFactory.java
rename to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
index 3e1af37347..3459ea5b1e 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveFileWriterFactory.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr.hive.writer;
 
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Schema;
@@ -32,9 +32,9 @@ import org.apache.iceberg.data.parquet.GenericParquetWriter;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 
-public class HiveFileWriterFactory extends BaseFileWriterFactory<Record> {
+class HiveFileWriterFactory extends BaseFileWriterFactory<Record> {
 
-  protected HiveFileWriterFactory(
+  HiveFileWriterFactory(
       Table table,
       FileFormat dataFileFormat,
       Schema dataSchema,
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergBufferedDeleteWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergBufferedDeleteWriter.java
similarity index 88%
rename from iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergBufferedDeleteWriter.java
rename to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergBufferedDeleteWriter.java
index 99d59341ed..2d91053355 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergBufferedDeleteWriter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergBufferedDeleteWriter.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr.hive.writer;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -26,7 +26,6 @@ import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
@@ -43,6 +42,8 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.FileWriterFactory;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.PartitioningWriter;
+import org.apache.iceberg.mr.hive.FilesForCommit;
+import org.apache.iceberg.mr.hive.IcebergAcidUtil;
 import org.apache.iceberg.mr.mapred.Container;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -58,12 +59,9 @@ import org.slf4j.LoggerFactory;
  * we only write out {@link PositionDelete} files where the row data is omitted, so only the filenames and the rowIds
  * have to be in the memory.
  */
-public class HiveIcebergBufferedDeleteWriter implements HiveIcebergWriter {
+class HiveIcebergBufferedDeleteWriter implements HiveIcebergWriter {
   private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergBufferedDeleteWriter.class);
 
-  private static final String DELETE_FILE_THREAD_POOL_SIZE = "iceberg.delete.file.thread.pool.size";
-  private static final int DELETE_FILE_THREAD_POOL_SIZE_DEFAULT = 10;
-
   // Storing deleted data in a map Partition -> FileName -> BitMap
   private final Map<PartitionKey, Map<String, Roaring64Bitmap>> buffer = Maps.newHashMap();
   private final Map<Integer, PartitionSpec> specs;
@@ -73,21 +71,21 @@ public class HiveIcebergBufferedDeleteWriter implements HiveIcebergWriter {
   private final OutputFileFactory fileFactory;
   private final FileIO io;
   private final long targetFileSize;
-  private final Configuration configuration;
+  private final int poolSize;
   private final Record record;
   private final InternalRecordWrapper wrapper;
   private FilesForCommit filesForCommit;
 
-  HiveIcebergBufferedDeleteWriter(Schema schema, Map<Integer, PartitionSpec> specs, FileFormat format,
-      FileWriterFactory<Record> writerFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
-      Configuration configuration) {
+  HiveIcebergBufferedDeleteWriter(Schema schema, Map<Integer, PartitionSpec> specs,
+      FileWriterFactory<Record> writerFactory, OutputFileFactory fileFactory,  FileFormat format, FileIO io,
+      long targetFileSize, int poolSize) {
     this.specs = specs;
     this.format = format;
     this.writerFactory = writerFactory;
     this.fileFactory = fileFactory;
     this.io = io;
     this.targetFileSize = targetFileSize;
-    this.configuration = configuration;
+    this.poolSize = poolSize;
     this.wrapper = new InternalRecordWrapper(schema.asStruct());
     this.record = GenericRecord.create(schema);
   }
@@ -114,7 +112,8 @@ public class HiveIcebergBufferedDeleteWriter implements HiveIcebergWriter {
     Collection<DeleteFile> deleteFiles = new ConcurrentLinkedQueue<>();
     if (!abort) {
       LOG.info("Delete file flush is started");
-      ExecutorService fileExecutor = fileExecutor(configuration, buffer.size());
+      int size = Math.min(buffer.size(), poolSize);
+      ExecutorService fileExecutor = fileExecutor(size);
       try {
         Tasks.foreach(buffer.keySet())
             .retry(3)
@@ -165,13 +164,12 @@ public class HiveIcebergBufferedDeleteWriter implements HiveIcebergWriter {
 
   /**
    * Executor service for parallel writing of delete files.
-   * @param conf The configuration containing the pool size
+   * @param poolSize The pool size
    * @return The generated executor service
    */
-  private static ExecutorService fileExecutor(Configuration conf, int maxSize) {
-    int size = Math.min(maxSize, conf.getInt(DELETE_FILE_THREAD_POOL_SIZE, DELETE_FILE_THREAD_POOL_SIZE_DEFAULT));
+  private static ExecutorService fileExecutor(int poolSize) {
     return Executors.newFixedThreadPool(
-        size,
+        poolSize,
         new ThreadFactoryBuilder()
             .setDaemon(true)
             .setPriority(Thread.NORM_PRIORITY)
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergDeleteWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java
similarity index 85%
rename from iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergDeleteWriter.java
rename to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java
index a31d29249c..368bfe1179 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergDeleteWriter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java
@@ -17,13 +17,12 @@
  * under the License.
  */
 
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr.hive.writer;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
@@ -36,17 +35,19 @@ import org.apache.iceberg.io.DeleteWriteResult;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.FileWriterFactory;
 import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.hive.FilesForCommit;
+import org.apache.iceberg.mr.hive.IcebergAcidUtil;
 import org.apache.iceberg.mr.mapred.Container;
 
-public class HiveIcebergDeleteWriter extends HiveIcebergWriterBase {
+class HiveIcebergDeleteWriter extends HiveIcebergWriterBase {
 
   private final GenericRecord rowDataTemplate;
 
-  HiveIcebergDeleteWriter(Schema schema, Map<Integer, PartitionSpec> specs, FileFormat fileFormat,
-      FileWriterFactory<Record> writerFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
-      TaskAttemptID taskAttemptID, String tableName) {
-    super(schema, specs, io, taskAttemptID, tableName,
-        new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSize), false);
+  HiveIcebergDeleteWriter(Schema schema, Map<Integer, PartitionSpec> specs,
+      FileWriterFactory<Record> writerFactory, OutputFileFactory fileFactory, FileFormat fileFormat, FileIO io,
+      long targetFileSize) {
+    super(schema, specs, io,
+        new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSize));
     rowDataTemplate = GenericRecord.create(schema);
   }
 
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
similarity index 82%
rename from iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
rename to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
index 476b538893..bde57803be 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
@@ -17,13 +17,12 @@
  * under the License.
  */
 
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr.hive.writer;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
@@ -34,18 +33,18 @@ import org.apache.iceberg.io.DataWriteResult;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.FileWriterFactory;
 import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.hive.FilesForCommit;
 import org.apache.iceberg.mr.mapred.Container;
 
 class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
 
   private final int currentSpecId;
 
-  HiveIcebergRecordWriter(
-      Schema schema, Map<Integer, PartitionSpec> specs, int currentSpecId, FileFormat format,
-      FileWriterFactory<Record> fileWriterFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
-      TaskAttemptID taskAttemptID, String tableName, boolean wrapped) {
-    super(schema, specs, io, taskAttemptID, tableName,
-        new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, format, targetFileSize), wrapped);
+  HiveIcebergRecordWriter(Schema schema, Map<Integer, PartitionSpec> specs, int currentSpecId,
+      FileWriterFactory<Record> fileWriterFactory, OutputFileFactory fileFactory, FileFormat format, FileIO io,
+      long targetFileSize) {
+    super(schema, specs, io,
+        new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, format, targetFileSize));
     this.currentSpecId = currentSpecId;
   }
 
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergUpdateWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergUpdateWriter.java
similarity index 77%
rename from iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergUpdateWriter.java
rename to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergUpdateWriter.java
index f234e6b037..4f4154854e 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergUpdateWriter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergUpdateWriter.java
@@ -17,15 +17,13 @@
  * under the License.
  */
 
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr.hive.writer;
 
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
@@ -34,11 +32,13 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.ClusteredDataWriter;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.FileWriterFactory;
 import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.hive.FilesForCommit;
+import org.apache.iceberg.mr.hive.IcebergAcidUtil;
 import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 
 /**
  * Hive update queries are converted to an insert statement where the result contains the updated rows.
@@ -46,22 +46,19 @@ import org.apache.iceberg.mr.mapred.Container;
  * The rows are sorted based on the requirements of the {@link HiveIcebergRecordWriter}.
  * The {@link HiveIcebergBufferedDeleteWriter} needs to handle out of order records.
  */
-class HiveIcebergUpdateWriter extends HiveIcebergWriterBase {
+class HiveIcebergUpdateWriter implements HiveIcebergWriter {
 
   private final HiveIcebergBufferedDeleteWriter deleteWriter;
   private final HiveIcebergRecordWriter insertWriter;
   private final Container<Record> container;
 
-  HiveIcebergUpdateWriter(
-      Schema schema, Map<Integer, PartitionSpec> specs, int currentSpecId, FileFormat format,
-      FileWriterFactory<Record> fileWriterFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
-      TaskAttemptID taskAttemptID, String tableName, Configuration configuration) {
-    super(schema, specs, io, taskAttemptID, tableName,
-        new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, format, targetFileSize), false);
-    this.deleteWriter = new HiveIcebergBufferedDeleteWriter(schema, specs, format, fileWriterFactory, fileFactory, io,
-        targetFileSize, configuration);
-    this.insertWriter = new HiveIcebergRecordWriter(schema, specs, currentSpecId, format, fileWriterFactory,
-        fileFactory, io, targetFileSize, taskAttemptID, tableName, true);
+  HiveIcebergUpdateWriter(Schema schema, Map<Integer, PartitionSpec> specs, int currentSpecId,
+      FileWriterFactory<Record> fileWriterFactory, OutputFileFactory fileFactory, OutputFileFactory deleteFileFactory,
+      FileFormat format, FileFormat deleteFormat, FileIO io, long targetFileSize, int poolSize) {
+    this.deleteWriter = new HiveIcebergBufferedDeleteWriter(schema, specs, fileWriterFactory, deleteFileFactory,
+        deleteFormat, io, targetFileSize, poolSize);
+    this.insertWriter = new HiveIcebergRecordWriter(schema, specs, currentSpecId, fileWriterFactory, fileFactory,
+        format, io, targetFileSize);
     this.container = new Container<>();
     Record record = GenericRecord.create(schema);
     container.set(record);
@@ -86,4 +83,9 @@ class HiveIcebergUpdateWriter extends HiveIcebergWriterBase {
     Collection<DeleteFile> deleteFiles = deleteWriter.files().deleteFiles();
     return new FilesForCommit(dataFiles, deleteFiles);
   }
+
+  @VisibleForTesting
+  HiveIcebergWriter deleteWriter() {
+    return deleteWriter;
+  }
 }
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriter.java
similarity index 79%
rename from iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java
rename to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriter.java
index 1ea127e023..bf2e17a821 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriter.java
@@ -17,15 +17,19 @@
  * under the License.
  */
 
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr.hive.writer;
 
 import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.hive.FilesForCommit;
 import org.apache.iceberg.mr.mapred.Container;
 
-public interface HiveIcebergWriter {
+public interface HiveIcebergWriter extends FileSinkOperator.RecordWriter,
+    org.apache.hadoop.mapred.RecordWriter<NullWritable, Container<Record>> {
   FilesForCommit files();
   void close(boolean abort) throws IOException;
   void write(Writable row) throws IOException;
@@ -37,5 +41,4 @@ public interface HiveIcebergWriter {
   default void write(NullWritable key, Container value) throws IOException {
     write(value);
   }
-
 }
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriterBase.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java
similarity index 67%
rename from iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriterBase.java
rename to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java
index cd0393f600..f286a0fa90 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriterBase.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java
@@ -17,14 +17,10 @@
  * under the License.
  */
 
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr.hive.writer;
 
 import java.io.IOException;
 import java.util.Map;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -32,54 +28,29 @@ import org.apache.iceberg.data.InternalRecordWrapper;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.PartitioningWriter;
-import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.mr.hive.FilesForCommit;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.Tasks;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("checkstyle:VisibilityModifier")
-public abstract class HiveIcebergWriterBase implements FileSinkOperator.RecordWriter,
-    org.apache.hadoop.mapred.RecordWriter<NullWritable, Container<Record>>, HiveIcebergWriter {
+abstract class HiveIcebergWriterBase implements HiveIcebergWriter {
   private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergWriterBase.class);
 
-  private static final Map<TaskAttemptID, Map<String, HiveIcebergWriter>> writers = Maps.newConcurrentMap();
-
-  static Map<String, HiveIcebergWriter> removeWriters(TaskAttemptID taskAttemptID) {
-    return writers.remove(taskAttemptID);
-  }
-
-  static Map<String, HiveIcebergWriter> getWriters(TaskAttemptID taskAttemptID) {
-    return writers.get(taskAttemptID);
-  }
-
   protected final FileIO io;
   protected final InternalRecordWrapper wrapper;
   protected final Map<Integer, PartitionSpec> specs;
   protected final Map<Integer, PartitionKey> partitionKeys;
   protected final PartitioningWriter writer;
 
-  protected HiveIcebergWriterBase(Schema schema, Map<Integer, PartitionSpec> specs, FileIO io, TaskAttemptID attemptID,
-      String tableName, PartitioningWriter writer, boolean wrapped) {
+  HiveIcebergWriterBase(Schema schema, Map<Integer, PartitionSpec> specs, FileIO io,
+      PartitioningWriter writer) {
     this.io = io;
     this.wrapper = new InternalRecordWrapper(schema.asStruct());
     this.specs = specs;
     this.partitionKeys = Maps.newHashMapWithExpectedSize(specs.size());
     this.writer = writer;
-    if (!wrapped) {
-      writers.putIfAbsent(attemptID, Maps.newConcurrentMap());
-      writers.get(attemptID).put(tableName, this);
-    }
-  }
-
-  @Override
-  public void write(NullWritable key, Container value) throws IOException {
-    write(value);
-  }
-
-  @Override
-  public void close(Reporter reporter) throws IOException {
-    close(false);
   }
 
   @Override
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java
new file mode 100644
index 0000000000..b4b26738c4
--- /dev/null
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.writer;
+
+import java.util.Locale;
+import java.util.Map;
+import org.apache.hadoop.hive.ql.Context.Operation;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.util.PropertyUtil;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
+
+public class WriterBuilder {
+  private final Table table;
+  private String tableName;
+  private TaskAttemptID attemptID;
+  private String queryId;
+  private int poolSize;
+  private Operation operation;
+
+  private WriterBuilder(Table table) {
+    this.table = table;
+  }
+
+  public static WriterBuilder builderFor(Table table) {
+    return new WriterBuilder(table);
+  }
+
+  public WriterBuilder tableName(String newTableName) {
+    this.tableName = newTableName;
+    return this;
+  }
+
+  public WriterBuilder attemptID(TaskAttemptID newAttemptID) {
+    this.attemptID = newAttemptID;
+    return this;
+  }
+
+  public WriterBuilder queryId(String newQueryId) {
+    this.queryId = newQueryId;
+    return this;
+  }
+
+  public WriterBuilder poolSize(int newPoolSize) {
+    this.poolSize = newPoolSize;
+    return this;
+  }
+
+  public WriterBuilder operation(Operation newOperation) {
+    this.operation = newOperation;
+    return this;
+  }
+
+  public HiveIcebergWriter build() {
+    Map<String, String> properties = table.properties();
+
+    String dataFileFormatName = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
+    FileFormat dataFileFormat = FileFormat.valueOf(dataFileFormatName.toUpperCase(Locale.ENGLISH));
+
+    String deleteFileFormatName = properties.getOrDefault(DELETE_DEFAULT_FILE_FORMAT, dataFileFormatName);
+    FileFormat deleteFileFormat = FileFormat.valueOf(deleteFileFormatName.toUpperCase(Locale.ENGLISH));
+
+    long targetFileSize = PropertyUtil.propertyAsLong(table.properties(), TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+
+    Schema dataSchema = table.schema();
+    FileIO io = table.io();
+    Map<Integer, PartitionSpec> specs = table.specs();
+    int currentSpecId = table.spec().specId();
+    int partitionId = attemptID.getTaskID().getId();
+    int taskId = attemptID.getId();
+    String operationId = queryId + "-" + attemptID.getJobID();
+    OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId)
+        .format(dataFileFormat)
+        .operationId("data-" + operationId)
+        .build();
+
+    OutputFileFactory deleteOutputFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId)
+        .format(deleteFileFormat)
+        .operationId("delete-" + operationId)
+        .build();
+
+    Schema positionDeleteRowSchema = operation == Operation.UPDATE ? null : dataSchema;
+    HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, dataFileFormat, dataSchema, null,
+        deleteFileFormat, null, null, null,
+        positionDeleteRowSchema);
+
+    HiveIcebergWriter writer;
+    switch (operation) {
+      case UPDATE:
+        writer = new HiveIcebergUpdateWriter(dataSchema, specs, currentSpecId, writerFactory, outputFileFactory,
+            deleteOutputFileFactory, dataFileFormat, deleteFileFormat, io, targetFileSize, poolSize);
+        break;
+      case DELETE:
+        writer = new HiveIcebergDeleteWriter(dataSchema, specs, writerFactory, deleteOutputFileFactory,
+            deleteFileFormat, io, targetFileSize);
+        break;
+      default:
+        writer = new HiveIcebergRecordWriter(dataSchema, specs, currentSpecId, writerFactory, outputFileFactory,
+            dataFileFormat, io, targetFileSize);
+        break;
+    }
+
+    WriterRegistry.registerWriter(attemptID, tableName, writer);
+    return writer;
+  }
+}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterRegistry.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterRegistry.java
new file mode 100644
index 0000000000..dba4973e41
--- /dev/null
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterRegistry.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.writer;
+
+import java.util.Map;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class WriterRegistry {
+  private static final Map<TaskAttemptID, Map<String, HiveIcebergWriter>> writers = Maps.newConcurrentMap();
+
+  private WriterRegistry() {
+  }
+
+  public static Map<String, HiveIcebergWriter> removeWriters(TaskAttemptID taskAttemptID) {
+    return writers.remove(taskAttemptID);
+  }
+
+  public static void registerWriter(TaskAttemptID taskAttemptID, String tableName, HiveIcebergWriter writer) {
+    writers.putIfAbsent(taskAttemptID, Maps.newConcurrentMap());
+    writers.get(taskAttemptID).put(tableName, writer);
+  }
+
+  public static Map<String, HiveIcebergWriter> writers(TaskAttemptID taskAttemptID) {
+    return writers.get(taskAttemptID);
+  }
+}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 82636ff95a..0ccb750fda 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.LlapHiveUtils;
+import org.apache.hadoop.hive.ql.Context.Operation;
 import org.apache.hadoop.hive.ql.io.PositionDeleteInfo;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.mapred.JobConf;
@@ -240,6 +241,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
     private T current;
     private CloseableIterator<T> currentIterator;
     private Table table;
+    private boolean updateOrDelete;
 
     @Override
     public void initialize(InputSplit split, TaskAttemptContext newContext) {
@@ -257,6 +259,8 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       this.inMemoryDataModel = conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
               InputFormatConfig.InMemoryDataModel.GENERIC);
       this.currentIterator = open(tasks.next(), expectedSchema).iterator();
+      Operation operation = HiveIcebergStorageHandler.operation(conf, conf.get(Catalogs.NAME));
+      this.updateOrDelete = Operation.DELETE.equals(operation) || Operation.UPDATE.equals(operation);
     }
 
     @Override
@@ -264,8 +268,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       while (true) {
         if (currentIterator.hasNext()) {
           current = currentIterator.next();
-          if (HiveIcebergStorageHandler.isDelete(conf, conf.get(Catalogs.NAME)) ||
-              HiveIcebergStorageHandler.isUpdate(conf, conf.get(Catalogs.NAME))) {
+          if (updateOrDelete) {
             GenericRecord rec = (GenericRecord) current;
             PositionDeleteInfo.setIntoConf(conf,
                 IcebergAcidUtil.parseSpecId(rec),
@@ -508,17 +511,24 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
       readSchema = caseSensitive ? table.schema().select(selectedColumns) :
           table.schema().caseInsensitiveSelect(selectedColumns);
 
-      // for DELETE queries, add additional metadata columns into the read schema
-      if (HiveIcebergStorageHandler.isDelete(conf, conf.get(Catalogs.NAME))) {
-        readSchema = IcebergAcidUtil.createFileReadSchemaForDelete(readSchema.columns(), table);
-      }
-
-      // for UPDATE queries, add additional metadata columns into the read schema
-      if (HiveIcebergStorageHandler.isUpdate(conf, conf.get(Catalogs.NAME))) {
-        readSchema = IcebergAcidUtil.createFileReadSchemaForUpdate(readSchema.columns(), table);
+      Operation operation = HiveIcebergStorageHandler.operation(conf, conf.get(Catalogs.NAME));
+      if (operation != null) {
+        switch (operation) {
+          case DELETE:
+            // for DELETE queries, add additional metadata columns into the read schema
+            return IcebergAcidUtil.createFileReadSchemaForDelete(readSchema.columns(), table);
+          case UPDATE:
+            // for UPDATE queries, add additional metadata columns into the read schema
+            return IcebergAcidUtil.createFileReadSchemaForUpdate(readSchema.columns(), table);
+          case OTHER:
+            // for INSERT queries no extra columns are needed
+            return readSchema;
+          default:
+            throw new IllegalArgumentException("Not supported operation " + operation);
+        }
+      } else {
+        return readSchema;
       }
-
-      return readSchema;
     }
 
     private static Schema schemaWithoutConstantsAndMeta(Schema readSchema, Map<Integer, ?> idToConstant) {
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
index 5857d858f5..d0eb3ebc8f 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
@@ -35,17 +35,17 @@ import org.apache.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.mr.TestHelper;
+import org.apache.iceberg.mr.hive.writer.HiveIcebergWriter;
+import org.apache.iceberg.mr.hive.writer.WriterBuilder;
+import org.apache.iceberg.mr.hive.writer.WriterRegistry;
 import org.apache.iceberg.mr.mapred.Container;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -59,11 +59,9 @@ import org.junit.rules.TemporaryFolder;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
-import static org.apache.iceberg.mr.hive.HiveIcebergWriterBase.getWriters;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
 public class TestHiveIcebergOutputCommitter {
-  private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024;
   private static final int RECORD_NUM = 5;
   private static final String QUERY_ID = "query_id";
   private static final JobID JOB_ID = new JobID("test", 0);
@@ -209,10 +207,10 @@ public class TestHiveIcebergOutputCommitter {
     Assert.assertEquals(1, argumentCaptor.getAllValues().size());
     TaskAttemptID capturedId = TezUtil.taskAttemptWrapper(argumentCaptor.getValue().getTaskAttemptID());
     // writer is still in the map after commitTask failure
-    Assert.assertNotNull(getWriters(capturedId));
+    Assert.assertNotNull(WriterRegistry.writers(capturedId));
     failingCommitter.abortTask(new TaskAttemptContextImpl(conf, capturedId));
     // abortTask succeeds and removes writer
-    Assert.assertNull(getWriters(capturedId));
+    Assert.assertNull(WriterRegistry.writers(capturedId));
   }
 
   private Table table(String location, boolean partitioned) {
@@ -247,7 +245,7 @@ public class TestHiveIcebergOutputCommitter {
 
   /**
    * Write random records to the given table using separate {@link HiveIcebergOutputCommitter} and
-   * a separate {@link HiveIcebergRecordWriter} for every task.
+   * a separate {@link HiveIcebergWriter} for every task.
    * @param name The name of the table to get the table object from the conf
    * @param taskNum The number of tasks in the job handled by the committer
    * @param attemptNum The id used for attempt number generation
@@ -257,14 +255,13 @@ public class TestHiveIcebergOutputCommitter {
    * @param conf The job configuration
    * @param committer The output committer that should be used for committing/aborting the tasks
    * @return The random generated records which were appended to the table
-   * @throws IOException Propagating {@link HiveIcebergRecordWriter} exceptions
+   * @throws IOException Propagating {@link HiveIcebergWriter} exceptions
    */
   private List<Record> writeRecords(String name, int taskNum, int attemptNum, boolean commitTasks, boolean abortTasks,
                                     JobConf conf, OutputCommitter committer) throws IOException {
     List<Record> expected = Lists.newArrayListWithExpectedSize(RECORD_NUM * taskNum);
 
     Table table = HiveIcebergStorageHandler.table(conf, name);
-    FileIO io = table.io();
     Schema schema = HiveIcebergStorageHandler.schema(conf);
 
     for (int i = 0; i < taskNum; ++i) {
@@ -273,23 +270,15 @@ public class TestHiveIcebergOutputCommitter {
       for (int j = 0; j < RECORD_NUM; ++j) {
         records.get(j).setField("customer_id", j / 3L);
       }
+
       TaskAttemptID taskId = new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, i, attemptNum);
-      int partitionId = taskId.getTaskID().getId();
-      String operationId = QUERY_ID + "-" + JOB_ID;
-      FileFormat fileFormat = FileFormat.PARQUET;
-      OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, partitionId, attemptNum)
-          .format(fileFormat)
-          .operationId(operationId)
+      HiveIcebergWriter testWriter = WriterBuilder.builderFor(table)
+          .attemptID(TezUtil.taskAttemptWrapper(taskId))
+          .queryId("Q_ID")
+          .tableName(conf.get(Catalogs.NAME))
+          .operation(Context.Operation.OTHER)
           .build();
 
-      HiveFileWriterFactory hfwf = new HiveFileWriterFactory(table, fileFormat, schema,
-          null, fileFormat, null, null, null, null);
-
-
-      HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema, table.specs(),
-          table.spec().specId(), fileFormat, hfwf, outputFileFactory, io, TARGET_FILE_SIZE,
-          TezUtil.taskAttemptWrapper(taskId), conf.get(Catalogs.NAME), false);
-
       Container<Record> container = new Container<>();
 
       for (Record record : records) {
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergWriterTestBase.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterTestBase.java
similarity index 91%
rename from iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergWriterTestBase.java
rename to iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterTestBase.java
index 1199391304..1a818a131a 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergWriterTestBase.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterTestBase.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr.hive.writer;
 
 import java.io.File;
 import java.io.IOException;
@@ -25,6 +25,9 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.MetadataColumns;
@@ -69,6 +72,7 @@ public class HiveIcebergWriterTestBase {
   private final HadoopTables tables = new HadoopTables(new HiveConf());
   private TestHelper helper;
   protected Table table;
+  protected WriterBuilder writerBuilder;
 
   @Rule
   public TemporaryFolder temp = new TemporaryFolder();
@@ -108,6 +112,14 @@ public class HiveIcebergWriterTestBase {
     TableOperations ops = ((BaseTable) table).operations();
     TableMetadata meta = ops.current();
     ops.commit(meta, meta.upgradeToFormatVersion(2));
+
+    JobID jobId = new JobID("test", 0);
+    TaskAttemptID taskAttemptID =
+        new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), TaskType.MAP, 0, 0);
+    writerBuilder = WriterBuilder.builderFor(table)
+        .attemptID(taskAttemptID)
+        .queryId("Q_ID")
+        .tableName("dummy");
   }
 
   @After
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergDeleteWriter.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergDeleteWriter.java
similarity index 78%
rename from iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergDeleteWriter.java
rename to iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergDeleteWriter.java
index 6b6e9c4b1e..9cac3a0262 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergDeleteWriter.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergDeleteWriter.java
@@ -17,23 +17,21 @@
  * under the License.
  */
 
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr.hive.writer;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.hive.IcebergAcidUtil;
 import org.apache.iceberg.mr.mapred.Container;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -45,13 +43,11 @@ import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
 public class TestHiveIcebergDeleteWriter extends HiveIcebergWriterTestBase {
-  private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024;
-  private static final JobID JOB_ID = new JobID("test", 0);
   private static final Set<Integer> DELETED_IDS = Sets.newHashSet(29, 61, 89, 100, 122);
 
   @Test
   public void testDelete() throws IOException {
-    HiveIcebergDeleteWriter testWriter = deleteWriter();
+    HiveIcebergWriter testWriter = deleteWriter();
 
     List<GenericRecord> deleteRecords = deleteRecords(table, DELETED_IDS);
 
@@ -99,18 +95,7 @@ public class TestHiveIcebergDeleteWriter extends HiveIcebergWriterTestBase {
     return deleteRecords;
   }
 
-  private HiveIcebergDeleteWriter deleteWriter() {
-    OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, 1, 2)
-        .format(fileFormat)
-        .operationId("3")
-        .build();
-
-    HiveFileWriterFactory hfwf = new HiveFileWriterFactory(table, fileFormat, SCHEMA, null, fileFormat, null, null,
-        null, null);
-
-    TaskAttemptID taskId = new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, 1, 0);
-
-    return new HiveIcebergDeleteWriter(table.schema(), table.specs(), fileFormat, hfwf, outputFileFactory, table.io(),
-        TARGET_FILE_SIZE, taskId, "partitioned");
+  private HiveIcebergWriter deleteWriter() {
+    return writerBuilder.operation(Context.Operation.DELETE).build();
   }
 }
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergUpdateWriter.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergUpdateWriter.java
similarity index 77%
rename from iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergUpdateWriter.java
rename to iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergUpdateWriter.java
index 37df7803ab..1ecf7f85f6 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergUpdateWriter.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergUpdateWriter.java
@@ -17,24 +17,21 @@
  * under the License.
  */
 
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr.hive.writer;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.hive.IcebergAcidUtil;
 import org.apache.iceberg.mr.mapred.Container;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -46,10 +43,6 @@ import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
 public class TestHiveIcebergUpdateWriter extends HiveIcebergWriterTestBase {
-  private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024;
-  private static final JobID JOB_ID = new JobID("test", 0);
-  private static final TaskAttemptID TASK_ATTEMPT_ID =
-      new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, 0, 0);
   private static final Map<Integer, GenericRecord> UPDATED_RECORDS = ImmutableMap.of(
       29, record(29, "d"),
       61, record(61, "h"),
@@ -58,15 +51,15 @@ public class TestHiveIcebergUpdateWriter extends HiveIcebergWriterTestBase {
       122, record(142, "k"));
 
   /**
-   * This test just runs sends the data through the DeleteWriter. Here we make sure that the correct rows are removed.
+   * This test just runs sends the data through the deleteWriter. Here we make sure that the correct rows are removed.
    * @throws IOException If here is an error
    */
   @Test
   public void testDelete() throws IOException {
-    HiveIcebergWriter testWriter = new HiveIcebergBufferedDeleteWriter(table.schema(), table.specs(), fileFormat,
-        hiveFileWriterFactory(), outputFileFactory(), table.io(), TARGET_FILE_SIZE, new HiveConf());
+    HiveIcebergWriter updateWriter = writerBuilder.poolSize(10).operation(Context.Operation.UPDATE).build();
+    HiveIcebergWriter deleteWriter = ((HiveIcebergUpdateWriter) updateWriter).deleteWriter();
 
-    update(table, testWriter);
+    update(table, deleteWriter);
 
     StructLikeSet expected = rowSetWithoutIds(RECORDS, UPDATED_RECORDS.keySet());
     StructLikeSet actual = actualRowSet(table);
@@ -80,9 +73,7 @@ public class TestHiveIcebergUpdateWriter extends HiveIcebergWriterTestBase {
    */
   @Test
   public void testUpdate() throws IOException {
-    HiveIcebergWriter testWriter = new HiveIcebergUpdateWriter(table.schema(), table.specs(), table.spec().specId(),
-        fileFormat, hiveFileWriterFactory(), outputFileFactory(), table.io(), TARGET_FILE_SIZE, TASK_ATTEMPT_ID,
-        "table_name", new HiveConf());
+    HiveIcebergWriter testWriter = writerBuilder.poolSize(10).operation(Context.Operation.UPDATE).build();
 
     update(table, testWriter);
 
@@ -93,18 +84,6 @@ public class TestHiveIcebergUpdateWriter extends HiveIcebergWriterTestBase {
     Assert.assertEquals("Table should contain expected rows", expected, actual);
   }
 
-  private OutputFileFactory outputFileFactory() {
-    return OutputFileFactory.builderFor(table, 1, 2)
-        .format(fileFormat)
-        .operationId("3")
-        .build();
-  }
-
-  private HiveFileWriterFactory hiveFileWriterFactory() {
-    return  new HiveFileWriterFactory(table, fileFormat, SCHEMA, null, fileFormat, null, null,
-        null, null);
-  }
-
   private static void update(Table table, HiveIcebergWriter testWriter) throws IOException {
     List<GenericRecord> updateRecords = updateRecords(table, UPDATED_RECORDS);