You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2022/05/04 08:50:15 UTC

[hive] branch master updated: HIVE-26183: Create delete writer for the UPDATE statements (Peter Vary reviewed by Adam Szita and Marton Bod) (#3251)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 9067431bee HIVE-26183: Create delete writer for the UPDATE statements (Peter Vary reviewed by Adam Szita and Marton Bod) (#3251)
9067431bee is described below

commit 9067431bee431c3f51124e30c6551ed04b2e3d22
Author: pvary <pv...@cloudera.com>
AuthorDate: Wed May 4 10:50:04 2022 +0200

    HIVE-26183: Create delete writer for the UPDATE statements (Peter Vary reviewed by Adam Szita and Marton Bod) (#3251)
---
 iceberg/iceberg-handler/pom.xml                    |   7 +-
 .../org/apache/iceberg/mr/hive/FilesForCommit.java |  18 +-
 .../mr/hive/HiveIcebergBufferedDeleteWriter.java   | 181 +++++++++++++++++++++
 .../iceberg/mr/hive/HiveIcebergDeleteWriter.java   |   4 +-
 .../mr/hive/HiveIcebergOutputCommitter.java        |   6 +-
 .../iceberg/mr/hive/HiveIcebergOutputFormat.java   |   4 +-
 .../iceberg/mr/hive/HiveIcebergRecordWriter.java   |   6 +-
 .../iceberg/mr/hive/HiveIcebergUpdateWriter.java   |  89 ++++++++++
 .../apache/iceberg/mr/hive/HiveIcebergWriter.java  |  84 +---------
 ...ebergWriter.java => HiveIcebergWriterBase.java} |  18 +-
 .../apache/iceberg/mr/hive/IcebergAcidUtil.java    | 108 +++++++++---
 .../org/apache/iceberg/data/IcebergGenerics2.java  | 103 ++++++++++++
 .../java/org/apache/iceberg/mr/TestHelper.java     |  27 +++
 .../iceberg/mr/hive/HiveIcebergWriterTestBase.java | 151 +++++++++++++++++
 .../mr/hive/TestHiveIcebergDeleteWriter.java       | 116 +++++++++++++
 .../mr/hive/TestHiveIcebergOutputCommitter.java    |   4 +-
 .../mr/hive/TestHiveIcebergUpdateWriter.java       | 159 ++++++++++++++++++
 iceberg/pom.xml                                    |   6 +
 18 files changed, 963 insertions(+), 128 deletions(-)

diff --git a/iceberg/iceberg-handler/pom.xml b/iceberg/iceberg-handler/pom.xml
index 6a37fdbf16..20ae1e6ec9 100644
--- a/iceberg/iceberg-handler/pom.xml
+++ b/iceberg/iceberg-handler/pom.xml
@@ -100,11 +100,16 @@
       <artifactId>assertj-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.iceberg</groupId>
+      <artifactId>iceberg-core</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.roaringbitmap</groupId>
       <artifactId>RoaringBitmap</artifactId>
       <version>0.9.22</version>
-      <scope>test</scope>
     </dependency>
   </dependencies>
   <build>
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
index 0dd490628c..237ef55369 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
@@ -20,8 +20,8 @@
 package org.apache.iceberg.mr.hive;
 
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.iceberg.ContentFile;
@@ -30,19 +30,19 @@ import org.apache.iceberg.DeleteFile;
 
 public class FilesForCommit implements Serializable {
 
-  private final List<DataFile> dataFiles;
-  private final List<DeleteFile> deleteFiles;
+  private final Collection<DataFile> dataFiles;
+  private final Collection<DeleteFile> deleteFiles;
 
-  public FilesForCommit(List<DataFile> dataFiles, List<DeleteFile> deleteFiles) {
+  public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> deleteFiles) {
     this.dataFiles = dataFiles;
     this.deleteFiles = deleteFiles;
   }
 
-  public static FilesForCommit onlyDelete(List<DeleteFile> deleteFiles) {
+  public static FilesForCommit onlyDelete(Collection<DeleteFile> deleteFiles) {
     return new FilesForCommit(Collections.emptyList(), deleteFiles);
   }
 
-  public static FilesForCommit onlyData(List<DataFile> dataFiles) {
+  public static FilesForCommit onlyData(Collection<DataFile> dataFiles) {
     return new FilesForCommit(dataFiles, Collections.emptyList());
   }
 
@@ -50,15 +50,15 @@ public class FilesForCommit implements Serializable {
     return new FilesForCommit(Collections.emptyList(), Collections.emptyList());
   }
 
-  public List<DataFile> dataFiles() {
+  public Collection<DataFile> dataFiles() {
     return dataFiles;
   }
 
-  public List<DeleteFile> deleteFiles() {
+  public Collection<DeleteFile> deleteFiles() {
     return deleteFiles;
   }
 
-  public List<? extends ContentFile> allFiles() {
+  public Collection<? extends ContentFile> allFiles() {
     return Stream.concat(dataFiles.stream(), deleteFiles.stream()).collect(Collectors.toList());
   }
 }
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergBufferedDeleteWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergBufferedDeleteWriter.java
new file mode 100644
index 0000000000..99d59341ed
--- /dev/null
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergBufferedDeleteWriter.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.InternalRecordWrapper;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
+import org.apache.iceberg.io.DeleteWriteResult;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileWriterFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitioningWriter;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.roaringbitmap.longlong.PeekableLongIterator;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link HiveIcebergBufferedDeleteWriter} needs to handle out of order records.
+ * We need to keep the incoming records in memory until they are written out. To keep the memory consumption minimal
+ * we only write out {@link PositionDelete} files where the row data is omitted, so only the filenames and the rowIds
+ * have to be in the memory.
+ */
+public class HiveIcebergBufferedDeleteWriter implements HiveIcebergWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergBufferedDeleteWriter.class);
+
+  private static final String DELETE_FILE_THREAD_POOL_SIZE = "iceberg.delete.file.thread.pool.size";
+  private static final int DELETE_FILE_THREAD_POOL_SIZE_DEFAULT = 10;
+
+  // Storing deleted data in a map Partition -> FileName -> BitMap
+  private final Map<PartitionKey, Map<String, Roaring64Bitmap>> buffer = Maps.newHashMap();
+  private final Map<Integer, PartitionSpec> specs;
+  private final Map<PartitionKey, PartitionSpec> keyToSpec = Maps.newHashMap();
+  private final FileFormat format;
+  private final FileWriterFactory<Record> writerFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+  private final Configuration configuration;
+  private final Record record;
+  private final InternalRecordWrapper wrapper;
+  private FilesForCommit filesForCommit;
+
+  HiveIcebergBufferedDeleteWriter(Schema schema, Map<Integer, PartitionSpec> specs, FileFormat format,
+      FileWriterFactory<Record> writerFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
+      Configuration configuration) {
+    this.specs = specs;
+    this.format = format;
+    this.writerFactory = writerFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+    this.configuration = configuration;
+    this.wrapper = new InternalRecordWrapper(schema.asStruct());
+    this.record = GenericRecord.create(schema);
+  }
+
+  @Override
+  public void write(Writable row) throws IOException {
+    Record rec = ((Container<Record>) row).get();
+    IcebergAcidUtil.populateWithOriginalValues(rec, record);
+    String filePath = IcebergAcidUtil.parseFilePath(rec);
+    int specId = IcebergAcidUtil.parseSpecId(rec);
+
+    Map<String, Roaring64Bitmap> deleteMap =
+        buffer.computeIfAbsent(partition(record, specId), key -> {
+          keyToSpec.put(key, specs.get(specId));
+          return Maps.newHashMap();
+        });
+    Roaring64Bitmap deletes = deleteMap.computeIfAbsent(filePath, unused -> new Roaring64Bitmap());
+    deletes.add(IcebergAcidUtil.parseFilePosition(rec));
+  }
+
+  @Override
+  public void close(boolean abort) throws IOException {
+    long startTime = System.currentTimeMillis();
+    Collection<DeleteFile> deleteFiles = new ConcurrentLinkedQueue<>();
+    if (!abort) {
+      LOG.info("Delete file flush is started");
+      ExecutorService fileExecutor = fileExecutor(configuration, buffer.size());
+      try {
+        Tasks.foreach(buffer.keySet())
+            .retry(3)
+            .executeWith(fileExecutor)
+            .onFailure((partition, exception) -> LOG.info("Failed to write delete file {}", partition, exception))
+            .run(partition -> {
+              PositionDelete<Record> positionDelete = PositionDelete.create();
+              PartitioningWriter writerForFiles;
+              try (PartitioningWriter writer =
+                       new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, io, format, targetFileSize)) {
+                Map<String, Roaring64Bitmap> deleteRows = buffer.get(partition);
+                for (String filePath : new TreeSet<>(deleteRows.keySet())) {
+                  Roaring64Bitmap deletes = deleteRows.get(filePath);
+                  PeekableLongIterator longIterator = deletes.getLongIterator();
+                  while (longIterator.hasNext()) {
+                    long position = longIterator.next();
+                    positionDelete.set(filePath, position, null);
+                    writer.write(positionDelete, keyToSpec.get(partition), partition);
+                  }
+                }
+                // We need the writer object later to get the generated data files
+                writerForFiles = writer;
+              }
+              deleteFiles.addAll(((DeleteWriteResult) writerForFiles.result()).deleteFiles());
+            }, IOException.class);
+      } finally {
+        fileExecutor.shutdown();
+      }
+    }
+
+    LOG.info("HiveIcebergBufferedDeleteWriter is closed with abort={}. Created {} delete files and it took {} ns.",
+        abort, deleteFiles.size(), System.currentTimeMillis() - startTime);
+    LOG.debug("Delete files written {}", deleteFiles);
+
+    this.filesForCommit = FilesForCommit.onlyDelete(deleteFiles);
+  }
+
+  @Override
+  public FilesForCommit files() {
+    return filesForCommit;
+  }
+
+  protected PartitionKey partition(Record row, int specId) {
+    PartitionKey partitionKey = new PartitionKey(specs.get(specId), specs.get(specId).schema());
+    partitionKey.partition(wrapper.wrap(row));
+    return partitionKey;
+  }
+
+  /**
+   * Executor service for parallel writing of delete files.
+   * @param conf The configuration containing the pool size
+   * @return The generated executor service
+   */
+  private static ExecutorService fileExecutor(Configuration conf, int maxSize) {
+    int size = Math.min(maxSize, conf.getInt(DELETE_FILE_THREAD_POOL_SIZE, DELETE_FILE_THREAD_POOL_SIZE_DEFAULT));
+    return Executors.newFixedThreadPool(
+        size,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setPriority(Thread.NORM_PRIORITY)
+            .setNameFormat("iceberg-delete-file-pool-%d")
+            .build());
+  }
+}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergDeleteWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergDeleteWriter.java
index a03a1ee1dd..a31d29249c 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergDeleteWriter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergDeleteWriter.java
@@ -38,7 +38,7 @@ import org.apache.iceberg.io.FileWriterFactory;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.mr.mapred.Container;
 
-public class HiveIcebergDeleteWriter extends HiveIcebergWriter {
+public class HiveIcebergDeleteWriter extends HiveIcebergWriterBase {
 
   private final GenericRecord rowDataTemplate;
 
@@ -46,7 +46,7 @@ public class HiveIcebergDeleteWriter extends HiveIcebergWriter {
       FileWriterFactory<Record> writerFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
       TaskAttemptID taskAttemptID, String tableName) {
     super(schema, specs, io, taskAttemptID, tableName,
-        new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSize));
+        new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSize), false);
     rowDataTemplate = GenericRecord.create(schema);
   }
 
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index 92dfa1cf78..00edc3bd2e 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -107,7 +107,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
     TaskAttemptID attemptID = context.getTaskAttemptID();
     JobConf jobConf = context.getJobConf();
     Collection<String> outputs = HiveIcebergStorageHandler.outputTables(context.getJobConf());
-    Map<String, HiveIcebergWriter> writers = Optional.ofNullable(HiveIcebergWriter.getWriters(attemptID))
+    Map<String, HiveIcebergWriter> writers = Optional.ofNullable(HiveIcebergWriterBase.getWriters(attemptID))
         .orElseGet(() -> {
           LOG.info("CommitTask found no writers for output tables: {}, attemptID: {}", outputs, attemptID);
           return ImmutableMap.of();
@@ -146,7 +146,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
     }
 
     // remove the writer to release the object
-    HiveIcebergWriter.removeWriters(attemptID);
+    HiveIcebergWriterBase.removeWriters(attemptID);
   }
 
   /**
@@ -159,7 +159,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
     TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext);
 
     // Clean up writer data from the local store
-    Map<String, HiveIcebergWriter> writers = HiveIcebergWriter.removeWriters(context.getTaskAttemptID());
+    Map<String, HiveIcebergWriter> writers = HiveIcebergWriterBase.removeWriters(context.getTaskAttemptID());
 
     // Remove files if it was not done already
     if (writers != null) {
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index 8cb378c470..ea3b29b439 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -63,7 +63,7 @@ public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Co
     // Not doing any check.
   }
 
-  private static HiveIcebergWriter writer(JobConf jc) {
+  private static HiveIcebergWriterBase writer(JobConf jc) {
     TaskAttemptID taskAttemptID = TezUtil.taskAttemptWrapper(jc);
     // It gets the config from the FileSinkOperator which has its own config for every target table
     Table table = HiveIcebergStorageHandler.table(jc, jc.get(hive_metastoreConstants.META_TABLE_NAME));
@@ -88,7 +88,7 @@ public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Co
           targetFileSize, taskAttemptID, tableName);
     } else {
       return new HiveIcebergRecordWriter(schema, table.specs(), table.spec().specId(), fileFormat, writerFactory,
-          outputFileFactory, io, targetFileSize, taskAttemptID, tableName);
+          outputFileFactory, io, targetFileSize, taskAttemptID, tableName, false);
     }
   }
 }
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
index f12b1fda20..476b538893 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
@@ -36,16 +36,16 @@ import org.apache.iceberg.io.FileWriterFactory;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.mr.mapred.Container;
 
-class HiveIcebergRecordWriter extends HiveIcebergWriter {
+class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
 
   private final int currentSpecId;
 
   HiveIcebergRecordWriter(
       Schema schema, Map<Integer, PartitionSpec> specs, int currentSpecId, FileFormat format,
       FileWriterFactory<Record> fileWriterFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
-      TaskAttemptID taskAttemptID, String tableName) {
+      TaskAttemptID taskAttemptID, String tableName, boolean wrapped) {
     super(schema, specs, io, taskAttemptID, tableName,
-        new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, format, targetFileSize));
+        new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, format, targetFileSize), wrapped);
     this.currentSpecId = currentSpecId;
   }
 
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergUpdateWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergUpdateWriter.java
new file mode 100644
index 0000000000..f234e6b037
--- /dev/null
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergUpdateWriter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.ClusteredDataWriter;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileWriterFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.mapred.Container;
+
+/**
+ * Hive update queries are converted to an insert statement where the result contains the updated rows.
+ * The schema is defined by {@link IcebergAcidUtil#createFileReadSchemaForUpdate(List, Table)}}.
+ * The rows are sorted based on the requirements of the {@link HiveIcebergRecordWriter}.
+ * The {@link HiveIcebergBufferedDeleteWriter} needs to handle out of order records.
+ */
+class HiveIcebergUpdateWriter extends HiveIcebergWriterBase {
+
+  private final HiveIcebergBufferedDeleteWriter deleteWriter;
+  private final HiveIcebergRecordWriter insertWriter;
+  private final Container<Record> container;
+
+  HiveIcebergUpdateWriter(
+      Schema schema, Map<Integer, PartitionSpec> specs, int currentSpecId, FileFormat format,
+      FileWriterFactory<Record> fileWriterFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
+      TaskAttemptID taskAttemptID, String tableName, Configuration configuration) {
+    super(schema, specs, io, taskAttemptID, tableName,
+        new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, format, targetFileSize), false);
+    this.deleteWriter = new HiveIcebergBufferedDeleteWriter(schema, specs, format, fileWriterFactory, fileFactory, io,
+        targetFileSize, configuration);
+    this.insertWriter = new HiveIcebergRecordWriter(schema, specs, currentSpecId, format, fileWriterFactory,
+        fileFactory, io, targetFileSize, taskAttemptID, tableName, true);
+    this.container = new Container<>();
+    Record record = GenericRecord.create(schema);
+    container.set(record);
+  }
+
+  @Override
+  public void write(Writable row) throws IOException {
+    deleteWriter.write(row);
+    IcebergAcidUtil.populateWithNewValues(((Container<Record>) row).get(), container.get());
+    insertWriter.write(container);
+  }
+
+  @Override
+  public void close(boolean abort) throws IOException {
+    deleteWriter.close(abort);
+    insertWriter.close(abort);
+  }
+
+  @Override
+  public FilesForCommit files() {
+    Collection<DataFile> dataFiles = insertWriter.files().dataFiles();
+    Collection<DeleteFile> deleteFiles = deleteWriter.files().deleteFiles();
+    return new FilesForCommit(dataFiles, deleteFiles);
+  }
+}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java
index 57c13f31cb..1ea127e023 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java
@@ -20,90 +20,22 @@
 package org.apache.iceberg.mr.hive;
 
 import java.io.IOException;
-import java.util.Map;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.iceberg.PartitionKey;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.InternalRecordWrapper;
-import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.PartitioningWriter;
 import org.apache.iceberg.mr.mapred.Container;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.util.Tasks;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-@SuppressWarnings("checkstyle:VisibilityModifier")
-public abstract class HiveIcebergWriter implements FileSinkOperator.RecordWriter,
-    org.apache.hadoop.mapred.RecordWriter<NullWritable, Container<Record>> {
-  private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergWriter.class);
+public interface HiveIcebergWriter {
+  FilesForCommit files();
+  void close(boolean abort) throws IOException;
+  void write(Writable row) throws IOException;
 
-  private static final Map<TaskAttemptID, Map<String, HiveIcebergWriter>> writers = Maps.newConcurrentMap();
-
-  static Map<String, HiveIcebergWriter> removeWriters(TaskAttemptID taskAttemptID) {
-    return writers.remove(taskAttemptID);
-  }
-
-  static Map<String, HiveIcebergWriter> getWriters(TaskAttemptID taskAttemptID) {
-    return writers.get(taskAttemptID);
-  }
-
-  protected final FileIO io;
-  protected final InternalRecordWrapper wrapper;
-  protected final Map<Integer, PartitionSpec> specs;
-  protected final Map<Integer, PartitionKey> partitionKeys;
-  protected final PartitioningWriter writer;
-
-  protected HiveIcebergWriter(Schema schema, Map<Integer, PartitionSpec> specs, FileIO io, TaskAttemptID attemptID,
-      String tableName, PartitioningWriter writer) {
-    this.io = io;
-    this.wrapper = new InternalRecordWrapper(schema.asStruct());
-    this.specs = specs;
-    this.partitionKeys = Maps.newHashMapWithExpectedSize(specs.size());
-    this.writer = writer;
-    writers.putIfAbsent(attemptID, Maps.newConcurrentMap());
-    writers.get(attemptID).put(tableName, this);
-  }
-
-  protected abstract FilesForCommit files();
-
-  @Override
-  public void write(NullWritable key, Container value) throws IOException {
-    write(value);
-  }
-
-  @Override
-  public void close(Reporter reporter) throws IOException {
+  default void close(Reporter reporter) throws IOException {
     close(false);
   }
 
-  @Override
-  public void close(boolean abort) throws IOException {
-    writer.close();
-    FilesForCommit result = files();
-
-    // If abort then remove the unnecessary files
-    if (abort) {
-      Tasks.foreach(result.allFiles())
-          .retry(3)
-          .suppressFailureWhenFinished()
-          .onFailure((file, exception) -> LOG.debug("Failed on to remove file {} on abort", file, exception))
-          .run(file -> io.deleteFile(file.path().toString()));
-    }
-
-    LOG.info("HiveIcebergWriter is closed with abort={}. Created {} data files and {} delete files", abort,
-        result.dataFiles().size(), result.deleteFiles().size());
+  default void write(NullWritable key, Container value) throws IOException {
+    write(value);
   }
 
-  protected PartitionKey partition(Record row, int specId) {
-    PartitionKey partitionKey = partitionKeys.computeIfAbsent(specId,
-        id -> new PartitionKey(specs.get(id), specs.get(id).schema()));
-    partitionKey.partition(wrapper.wrap(row));
-    return partitionKey;
-  }
 }
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriterBase.java
similarity index 88%
copy from iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java
copy to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriterBase.java
index 57c13f31cb..cd0393f600 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriterBase.java
@@ -39,9 +39,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("checkstyle:VisibilityModifier")
-public abstract class HiveIcebergWriter implements FileSinkOperator.RecordWriter,
-    org.apache.hadoop.mapred.RecordWriter<NullWritable, Container<Record>> {
-  private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergWriter.class);
+public abstract class HiveIcebergWriterBase implements FileSinkOperator.RecordWriter,
+    org.apache.hadoop.mapred.RecordWriter<NullWritable, Container<Record>>, HiveIcebergWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergWriterBase.class);
 
   private static final Map<TaskAttemptID, Map<String, HiveIcebergWriter>> writers = Maps.newConcurrentMap();
 
@@ -59,19 +59,19 @@ public abstract class HiveIcebergWriter implements FileSinkOperator.RecordWriter
   protected final Map<Integer, PartitionKey> partitionKeys;
   protected final PartitioningWriter writer;
 
-  protected HiveIcebergWriter(Schema schema, Map<Integer, PartitionSpec> specs, FileIO io, TaskAttemptID attemptID,
-      String tableName, PartitioningWriter writer) {
+  protected HiveIcebergWriterBase(Schema schema, Map<Integer, PartitionSpec> specs, FileIO io, TaskAttemptID attemptID,
+      String tableName, PartitioningWriter writer, boolean wrapped) {
     this.io = io;
     this.wrapper = new InternalRecordWrapper(schema.asStruct());
     this.specs = specs;
     this.partitionKeys = Maps.newHashMapWithExpectedSize(specs.size());
     this.writer = writer;
-    writers.putIfAbsent(attemptID, Maps.newConcurrentMap());
-    writers.get(attemptID).put(tableName, this);
+    if (!wrapped) {
+      writers.putIfAbsent(attemptID, Maps.newConcurrentMap());
+      writers.get(attemptID).put(tableName, this);
+    }
   }
 
-  protected abstract FilesForCommit files();
-
   @Override
   public void write(NullWritable key, Container value) throws IOException {
     write(value);
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
index 2a358f4fe4..faa2ca3ec1 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.mr.hive;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
@@ -38,24 +39,24 @@ public class IcebergAcidUtil {
   }
 
   private static final Types.NestedField PARTITION_STRUCT_META_COL = null; // placeholder value in the map
-  private static final Map<Types.NestedField, Integer> DELETE_FILE_READ_META_COLS = Maps.newLinkedHashMap();
+  private static final Map<Types.NestedField, Integer> FILE_READ_META_COLS = Maps.newLinkedHashMap();
 
   static {
-    DELETE_FILE_READ_META_COLS.put(MetadataColumns.SPEC_ID, 0);
-    DELETE_FILE_READ_META_COLS.put(PARTITION_STRUCT_META_COL, 1);
-    DELETE_FILE_READ_META_COLS.put(MetadataColumns.FILE_PATH, 2);
-    DELETE_FILE_READ_META_COLS.put(MetadataColumns.ROW_POSITION, 3);
+    FILE_READ_META_COLS.put(MetadataColumns.SPEC_ID, 0);
+    FILE_READ_META_COLS.put(PARTITION_STRUCT_META_COL, 1);
+    FILE_READ_META_COLS.put(MetadataColumns.FILE_PATH, 2);
+    FILE_READ_META_COLS.put(MetadataColumns.ROW_POSITION, 3);
   }
 
   private static final Types.NestedField PARTITION_HASH_META_COL = Types.NestedField.required(
       MetadataColumns.PARTITION_COLUMN_ID, MetadataColumns.PARTITION_COLUMN_NAME, Types.LongType.get());
-  private static final Map<Types.NestedField, Integer> DELETE_SERDE_META_COLS = Maps.newLinkedHashMap();
+  private static final Map<Types.NestedField, Integer> SERDE_META_COLS = Maps.newLinkedHashMap();
 
   static {
-    DELETE_SERDE_META_COLS.put(MetadataColumns.SPEC_ID, 0);
-    DELETE_SERDE_META_COLS.put(PARTITION_HASH_META_COL, 1);
-    DELETE_SERDE_META_COLS.put(MetadataColumns.FILE_PATH, 2);
-    DELETE_SERDE_META_COLS.put(MetadataColumns.ROW_POSITION, 3);
+    SERDE_META_COLS.put(MetadataColumns.SPEC_ID, 0);
+    SERDE_META_COLS.put(PARTITION_HASH_META_COL, 1);
+    SERDE_META_COLS.put(MetadataColumns.FILE_PATH, 2);
+    SERDE_META_COLS.put(MetadataColumns.ROW_POSITION, 3);
   }
 
   /**
@@ -64,8 +65,8 @@ public class IcebergAcidUtil {
    * @return The schema for reading files, extended with metadata columns needed for deletes
    */
   public static Schema createFileReadSchemaForDelete(List<Types.NestedField> dataCols, Table table) {
-    List<Types.NestedField> cols = Lists.newArrayListWithCapacity(dataCols.size() + DELETE_FILE_READ_META_COLS.size());
-    DELETE_FILE_READ_META_COLS.forEach((metaCol, index) -> {
+    List<Types.NestedField> cols = Lists.newArrayListWithCapacity(dataCols.size() + FILE_READ_META_COLS.size());
+    FILE_READ_META_COLS.forEach((metaCol, index) -> {
       if (metaCol == PARTITION_STRUCT_META_COL) {
         cols.add(MetadataColumns.metadataColumn(table, MetadataColumns.PARTITION_COLUMN_NAME));
       } else {
@@ -81,23 +82,25 @@ public class IcebergAcidUtil {
    * @return The schema for SerDe operations, extended with metadata columns needed for deletes
    */
   public static Schema createSerdeSchemaForDelete(List<Types.NestedField> dataCols) {
-    List<Types.NestedField> cols = Lists.newArrayListWithCapacity(dataCols.size() + DELETE_SERDE_META_COLS.size());
-    DELETE_SERDE_META_COLS.forEach((metaCol, index) -> cols.add(metaCol));
+    List<Types.NestedField> cols = Lists.newArrayListWithCapacity(dataCols.size() + SERDE_META_COLS.size());
+    SERDE_META_COLS.forEach((metaCol, index) -> cols.add(metaCol));
     cols.addAll(dataCols);
     return new Schema(cols);
   }
 
   /**
+   * Based on `rec` the method creates a position delete object, and also populates the data filed of `rowData` with
+   * the field values from `rec`.
    * @param rec The record read by the file scan task, which contains both the metadata fields and the row data fields
    * @param rowData The record object to populate with the rowData fields only
    * @return The position delete object
    */
   public static PositionDelete<Record> getPositionDelete(Record rec, Record rowData) {
     PositionDelete<Record> positionDelete = PositionDelete.create();
-    String filePath = rec.get(DELETE_SERDE_META_COLS.get(MetadataColumns.FILE_PATH), String.class);
-    long filePosition = rec.get(DELETE_SERDE_META_COLS.get(MetadataColumns.ROW_POSITION), Long.class);
+    String filePath = rec.get(SERDE_META_COLS.get(MetadataColumns.FILE_PATH), String.class);
+    long filePosition = rec.get(SERDE_META_COLS.get(MetadataColumns.ROW_POSITION), Long.class);
 
-    int dataOffset = DELETE_SERDE_META_COLS.size(); // position in the rec where the actual row data begins
+    int dataOffset = SERDE_META_COLS.size(); // position in the rec where the actual row data begins
     for (int i = dataOffset; i < rec.size(); ++i) {
       rowData.set(i - dataOffset, rec.get(i));
     }
@@ -106,22 +109,85 @@ public class IcebergAcidUtil {
     return positionDelete;
   }
 
+  /**
+   * @param dataCols The columns of the original file read schema
+   * @param table The table object - it is used for populating the partition struct meta column
+   * @return The schema for reading files, extended with metadata columns needed for deletes
+   */
+  public static Schema createFileReadSchemaForUpdate(List<Types.NestedField> dataCols, Table table) {
+    List<Types.NestedField> cols = Lists.newArrayListWithCapacity(dataCols.size() + SERDE_META_COLS.size());
+    SERDE_META_COLS.forEach((metaCol, index) -> {
+      if (metaCol == PARTITION_STRUCT_META_COL) {
+        cols.add(MetadataColumns.metadataColumn(table, MetadataColumns.PARTITION_COLUMN_NAME));
+      } else {
+        cols.add(metaCol);
+      }
+    });
+    // New column values
+    cols.addAll(dataCols);
+    // Old column values
+    cols.addAll(dataCols.stream()
+        .map(f -> Types.NestedField.optional(1147483545 + f.fieldId(), "__old_value_for" + f.name(), f.type()))
+        .collect(Collectors.toList()));
+    return new Schema(cols);
+  }
+
+  /**
+   * @param dataCols The columns of the serde projection schema
+   * @return The schema for SerDe operations, extended with metadata columns needed for deletes
+   */
+  public static Schema createSerdeSchemaForUpdate(List<Types.NestedField> dataCols) {
+    List<Types.NestedField> cols = Lists.newArrayListWithCapacity(dataCols.size() + SERDE_META_COLS.size());
+    SERDE_META_COLS.forEach((metaCol, index) -> cols.add(metaCol));
+    // New column values
+    cols.addAll(dataCols);
+    // Old column values
+    cols.addAll(dataCols.stream()
+        .map(f -> Types.NestedField.optional(1147483545 + f.fieldId(), "__old_value_for_" + f.name(), f.type()))
+        .collect(Collectors.toList()));
+    return new Schema(cols);
+  }
+
+  /**
+   * Get the original record from the updated record. Populate the `original` with the filed values from `rec`.
+   * @param rec The record read by the file scan task, which contains both the metadata fields and the row data fields
+   * @param original The record object to populate. The end result is the original record before the update.
+   */
+  public static void populateWithOriginalValues(Record rec, Record original) {
+    int dataOffset = SERDE_META_COLS.size() + original.size();
+    for (int i = dataOffset; i < dataOffset + original.size(); ++i) {
+      original.set(i - dataOffset, rec.get(i));
+    }
+  }
+
+  /**
+   * Get the new record from the updated record. Populate the `newRecord` with the filed values from `rec`.
+   * @param rec The record read by the file scan task, which contains both the metadata fields and the row data fields
+   * @param newRecord The record object to populate. The end result is the new record after the update.
+   */
+  public static void populateWithNewValues(Record rec, Record newRecord) {
+    int dataOffset = SERDE_META_COLS.size();
+    for (int i = dataOffset; i < dataOffset + newRecord.size(); ++i) {
+      newRecord.set(i - dataOffset, rec.get(i));
+    }
+  }
+
   public static int parseSpecId(Record rec) {
-    return rec.get(DELETE_FILE_READ_META_COLS.get(MetadataColumns.SPEC_ID), Integer.class);
+    return rec.get(FILE_READ_META_COLS.get(MetadataColumns.SPEC_ID), Integer.class);
   }
 
   public static long computePartitionHash(Record rec) {
-    StructProjection part = rec.get(DELETE_FILE_READ_META_COLS.get(PARTITION_STRUCT_META_COL), StructProjection.class);
+    StructProjection part = rec.get(FILE_READ_META_COLS.get(PARTITION_STRUCT_META_COL), StructProjection.class);
     // we need to compute a hash value for the partition struct so that it can be used as a sorting key
     return computeHash(part);
   }
 
   public static String parseFilePath(Record rec) {
-    return rec.get(DELETE_FILE_READ_META_COLS.get(MetadataColumns.FILE_PATH), String.class);
+    return rec.get(FILE_READ_META_COLS.get(MetadataColumns.FILE_PATH), String.class);
   }
 
   public static long parseFilePosition(Record rec) {
-    return rec.get(DELETE_FILE_READ_META_COLS.get(MetadataColumns.ROW_POSITION), Long.class);
+    return rec.get(FILE_READ_META_COLS.get(MetadataColumns.ROW_POSITION), Long.class);
   }
 
   private static long computeHash(StructProjection struct) {
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/IcebergGenerics2.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/IcebergGenerics2.java
new file mode 100644
index 0000000000..fc87302591
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/IcebergGenerics2.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+
+public class IcebergGenerics2 {
+  private IcebergGenerics2() {
+  }
+
+  /**
+   * Returns a builder to configure a read of the given table that produces generic records.
+   *
+   * @param table an Iceberg table
+   * @return a builder to configure the scan
+   */
+  public static ScanBuilder read(Table table) {
+    return new ScanBuilder(table);
+  }
+
+  public static class ScanBuilder {
+    private TableScan tableScan;
+    private boolean reuseContainers = false;
+
+    public ScanBuilder(Table table) {
+      this.tableScan = table.newScan();
+    }
+
+    public ScanBuilder reuseContainers() {
+      this.reuseContainers = true;
+      return this;
+    }
+
+    public ScanBuilder where(Expression rowFilter) {
+      this.tableScan = tableScan.filter(rowFilter);
+      return this;
+    }
+
+    public ScanBuilder caseInsensitive() {
+      this.tableScan = tableScan.caseSensitive(false);
+      return this;
+    }
+
+    public ScanBuilder select(String... selectedColumns) {
+      this.tableScan = tableScan.select(ImmutableList.copyOf(selectedColumns));
+      return this;
+    }
+
+    public ScanBuilder project(Schema schema) {
+      this.tableScan = tableScan.project(schema);
+      return this;
+    }
+
+    public ScanBuilder useSnapshot(long scanSnapshotId) {
+      this.tableScan = tableScan.useSnapshot(scanSnapshotId);
+      return this;
+    }
+
+    public ScanBuilder asOfTime(long scanTimestampMillis) {
+      this.tableScan = tableScan.asOfTime(scanTimestampMillis);
+      return this;
+    }
+
+    public ScanBuilder appendsBetween(long fromSnapshotId, long toSnapshotId) {
+      this.tableScan = tableScan.appendsBetween(fromSnapshotId, toSnapshotId);
+      return this;
+    }
+
+    public ScanBuilder appendsAfter(long fromSnapshotId) {
+      this.tableScan = tableScan.appendsAfter(fromSnapshotId);
+      return this;
+    }
+
+    public CloseableIterable<Record> build() {
+      return new TableScanIterable(
+          tableScan,
+          reuseContainers
+      );
+    }
+  }
+}
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java
index c7170420bb..25c04aebfd 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
@@ -40,6 +41,7 @@ import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.junit.rules.TemporaryFolder;
 
@@ -120,6 +122,31 @@ public class TestHelper {
     appender().appendToTable(partition, records);
   }
 
+  /**
+   * Appends the rows to the table. If the table is partitioned then it will create the correct partitions.
+   * @param rowSet The rows to add
+   * @throws IOException If there is an exception during writing out the files
+   */
+  public void appendToTable(List<Record> rowSet) throws IOException {
+    // The rows collected by partitions
+    Map<PartitionKey, List<Record>> rows = Maps.newHashMap();
+    PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
+    for (Record record : rowSet) {
+      partitionKey.partition(record);
+      List<Record> partitionRows = rows.get(partitionKey);
+      if (partitionRows == null) {
+        partitionRows = Lists.newArrayList();
+        rows.put(partitionKey.copy(), partitionRows);
+      }
+
+      partitionRows.add(record);
+    }
+
+    for (PartitionKey partition : rows.keySet()) {
+      appendToTable(partition, rows.get(partition));
+    }
+  }
+
   public DataFile writeFile(StructLike partition, List<Record> records) throws IOException {
     return appender().writeFile(partition, records);
   }
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergWriterTestBase.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergWriterTestBase.java
new file mode 100644
index 0000000000..1199391304
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergWriterTestBase.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenerics2;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.mr.TestHelper;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runners.Parameterized;
+
+public class HiveIcebergWriterTestBase {
+  // Schema passed to create tables
+  public static final Schema SCHEMA = new Schema(
+      Types.NestedField.required(1, "id", Types.IntegerType.get()),
+      Types.NestedField.required(2, "data", Types.StringType.get())
+  );
+
+  public static final List<Record> RECORDS = TestHelper.RecordsBuilder.newInstance(SCHEMA)
+      .add(29, "a")
+      .add(43, "b")
+      .add(61, "c")
+      .add(89, "d")
+      .add(100, "e")
+      .add(121, "f")
+      .add(122, "g")
+      .build();
+
+  private final HadoopTables tables = new HadoopTables(new HiveConf());
+  private TestHelper helper;
+  protected Table table;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Parameterized.Parameter(0)
+  public FileFormat fileFormat;
+
+  @Parameterized.Parameter(1)
+  public boolean partitioned;
+
+  @Parameterized.Parameters(name = "fileFormat={0}, partitioned={1}")
+  public static Collection<Object[]> parameters() {
+    return Lists.newArrayList(new Object[][] {
+        { FileFormat.PARQUET, true },
+        { FileFormat.ORC, true },
+        { FileFormat.AVRO, true },
+        { FileFormat.PARQUET, false },
+// Skip this until the ORC reader is fixed - test only issue
+//        { FileFormat.ORC, false },
+        { FileFormat.AVRO, false }
+    });
+  }
+
+  @Before
+  public void init() throws IOException {
+    File location = temp.newFolder(fileFormat.name());
+    Assert.assertTrue(location.delete());
+
+    PartitionSpec spec = !partitioned ? PartitionSpec.unpartitioned() :
+        PartitionSpec.builderFor(SCHEMA)
+            .bucket("data", 3)
+            .build();
+    this.helper = new TestHelper(new HiveConf(), tables, location.toString(), SCHEMA, spec, fileFormat, temp);
+    this.table = helper.createTable();
+    helper.appendToTable(RECORDS);
+
+    TableOperations ops = ((BaseTable) table).operations();
+    TableMetadata meta = ops.current();
+    ops.commit(meta, meta.upgradeToFormatVersion(2));
+  }
+
+  @After
+  public void cleanUp() {
+    tables.dropTable(helper.table().location());
+  }
+
+  protected StructLikeSet rowSetWithoutIds(List<Record> records, Set<Integer> idToDelete) {
+    StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+    records.stream()
+        .filter(row -> !idToDelete.contains(row.getField("id")))
+        .forEach(set::add);
+    return set;
+  }
+
+  protected static List<GenericRecord> readRecords(Table table, Schema schema) throws IOException {
+    List<GenericRecord> records = Lists.newArrayList();
+    try (CloseableIterable<Record> reader = IcebergGenerics2.read(table).project(schema).build()) {
+      // For these tests we can be sure that the records are GenericRecords, and we need it to easily access fields
+      reader.forEach(record -> records.add((GenericRecord) record));
+    }
+
+    return records;
+  }
+
+  protected static StructLikeSet actualRowSet(Table table) throws IOException {
+    StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+    readRecords(table, table.schema()).forEach(set::add);
+    return set;
+  }
+
+  protected static Schema schemaWithMeta(Table table) {
+    List<Types.NestedField> cols = Lists.newArrayListWithCapacity(table.schema().columns().size() + 4);
+    cols.addAll(table.schema().columns());
+    cols.add(MetadataColumns.ROW_POSITION);
+    cols.add(MetadataColumns.FILE_PATH);
+    cols.add(MetadataColumns.SPEC_ID);
+    cols.add(MetadataColumns.metadataColumn(table, MetadataColumns.PARTITION_COLUMN_NAME));
+    return new Schema(cols);
+  }
+}
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergDeleteWriter.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergDeleteWriter.java
new file mode 100644
index 0000000000..6b6e9c4b1e
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergDeleteWriter.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestHiveIcebergDeleteWriter extends HiveIcebergWriterTestBase {
+  private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024;
+  private static final JobID JOB_ID = new JobID("test", 0);
+  private static final Set<Integer> DELETED_IDS = Sets.newHashSet(29, 61, 89, 100, 122);
+
+  @Test
+  public void testDelete() throws IOException {
+    HiveIcebergDeleteWriter testWriter = deleteWriter();
+
+    List<GenericRecord> deleteRecords = deleteRecords(table, DELETED_IDS);
+
+    Collections.sort(deleteRecords,
+        Comparator.comparing(a -> a.getField(MetadataColumns.PARTITION_COLUMN_NAME).toString()));
+
+    Container<Record> container = new Container<>();
+    for (Record deleteRecord : deleteRecords) {
+      container.set(deleteRecord);
+      testWriter.write(container);
+    }
+
+    testWriter.close(false);
+
+    RowDelta rowDelta = table.newRowDelta();
+    testWriter.files().deleteFiles().forEach(rowDelta::addDeletes);
+    rowDelta.commit();
+
+    StructLikeSet expected = rowSetWithoutIds(RECORDS, DELETED_IDS);
+    StructLikeSet actual = actualRowSet(table);
+
+    Assert.assertEquals("Table should contain expected rows", expected, actual);
+  }
+
+  private static List<GenericRecord> deleteRecords(Table table, Set<Integer> idsToRemove)
+      throws IOException {
+    List<GenericRecord> deleteRecords = Lists.newArrayListWithExpectedSize(idsToRemove.size());
+    for (GenericRecord record : readRecords(table, schemaWithMeta(table))) {
+      if (!idsToRemove.contains(record.getField("id"))) {
+        continue;
+      }
+
+      GenericRecord deleteRecord = GenericRecord.create(IcebergAcidUtil.createSerdeSchemaForDelete(SCHEMA.columns()));
+      int specId = (Integer) record.getField(MetadataColumns.SPEC_ID.name());
+      deleteRecord.setField(MetadataColumns.SPEC_ID.name(), specId);
+      PartitionKey partitionKey = new PartitionKey(table.specs().get(specId), table.schema());
+      partitionKey.partition(record);
+      deleteRecord.setField(MetadataColumns.PARTITION_COLUMN_NAME, partitionKey);
+      deleteRecord.setField(MetadataColumns.FILE_PATH.name(), record.getField(MetadataColumns.FILE_PATH.name()));
+      deleteRecord.setField(MetadataColumns.ROW_POSITION.name(), record.getField(MetadataColumns.ROW_POSITION.name()));
+
+      SCHEMA.columns().forEach(field -> deleteRecord.setField(field.name(), record.getField(field.name())));
+      deleteRecords.add(deleteRecord);
+    }
+    return deleteRecords;
+  }
+
+  private HiveIcebergDeleteWriter deleteWriter() {
+    OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, 1, 2)
+        .format(fileFormat)
+        .operationId("3")
+        .build();
+
+    HiveFileWriterFactory hfwf = new HiveFileWriterFactory(table, fileFormat, SCHEMA, null, fileFormat, null, null,
+        null, null);
+
+    TaskAttemptID taskId = new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, 1, 0);
+
+    return new HiveIcebergDeleteWriter(table.schema(), table.specs(), fileFormat, hfwf, outputFileFactory, table.io(),
+        TARGET_FILE_SIZE, taskId, "partitioned");
+  }
+}
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
index 4896f0899e..5857d858f5 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
@@ -59,7 +59,7 @@ import org.junit.rules.TemporaryFolder;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
-import static org.apache.iceberg.mr.hive.HiveIcebergWriter.getWriters;
+import static org.apache.iceberg.mr.hive.HiveIcebergWriterBase.getWriters;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
 public class TestHiveIcebergOutputCommitter {
@@ -288,7 +288,7 @@ public class TestHiveIcebergOutputCommitter {
 
       HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema, table.specs(),
           table.spec().specId(), fileFormat, hfwf, outputFileFactory, io, TARGET_FILE_SIZE,
-          TezUtil.taskAttemptWrapper(taskId), conf.get(Catalogs.NAME));
+          TezUtil.taskAttemptWrapper(taskId), conf.get(Catalogs.NAME), false);
 
       Container<Record> container = new Container<>();
 
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergUpdateWriter.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergUpdateWriter.java
new file mode 100644
index 0000000000..37df7803ab
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergUpdateWriter.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestHiveIcebergUpdateWriter extends HiveIcebergWriterTestBase {
+  private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024;
+  private static final JobID JOB_ID = new JobID("test", 0);
+  private static final TaskAttemptID TASK_ATTEMPT_ID =
+      new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, 0, 0);
+  private static final Map<Integer, GenericRecord> UPDATED_RECORDS = ImmutableMap.of(
+      29, record(29, "d"),
+      61, record(61, "h"),
+      89, record(81, "a"),
+      100, record(132, "e"),
+      122, record(142, "k"));
+
+  /**
+   * This test just runs sends the data through the DeleteWriter. Here we make sure that the correct rows are removed.
+   * @throws IOException If here is an error
+   */
+  @Test
+  public void testDelete() throws IOException {
+    HiveIcebergWriter testWriter = new HiveIcebergBufferedDeleteWriter(table.schema(), table.specs(), fileFormat,
+        hiveFileWriterFactory(), outputFileFactory(), table.io(), TARGET_FILE_SIZE, new HiveConf());
+
+    update(table, testWriter);
+
+    StructLikeSet expected = rowSetWithoutIds(RECORDS, UPDATED_RECORDS.keySet());
+    StructLikeSet actual = actualRowSet(table);
+
+    Assert.assertEquals("Table should contain expected rows", expected, actual);
+  }
+
+  /**
+   * This test uses the UpdateWriter to check that the values are correctly updated.
+   * @throws IOException If there is an error
+   */
+  @Test
+  public void testUpdate() throws IOException {
+    HiveIcebergWriter testWriter = new HiveIcebergUpdateWriter(table.schema(), table.specs(), table.spec().specId(),
+        fileFormat, hiveFileWriterFactory(), outputFileFactory(), table.io(), TARGET_FILE_SIZE, TASK_ATTEMPT_ID,
+        "table_name", new HiveConf());
+
+    update(table, testWriter);
+
+    StructLikeSet expected = rowSetWithoutIds(RECORDS, UPDATED_RECORDS.keySet());
+    expected.addAll(UPDATED_RECORDS.values());
+    StructLikeSet actual = actualRowSet(table);
+
+    Assert.assertEquals("Table should contain expected rows", expected, actual);
+  }
+
+  private OutputFileFactory outputFileFactory() {
+    return OutputFileFactory.builderFor(table, 1, 2)
+        .format(fileFormat)
+        .operationId("3")
+        .build();
+  }
+
+  private HiveFileWriterFactory hiveFileWriterFactory() {
+    return  new HiveFileWriterFactory(table, fileFormat, SCHEMA, null, fileFormat, null, null,
+        null, null);
+  }
+
+  private static void update(Table table, HiveIcebergWriter testWriter) throws IOException {
+    List<GenericRecord> updateRecords = updateRecords(table, UPDATED_RECORDS);
+
+    Collections.sort(updateRecords, Comparator.comparing(a -> a.getField("data").toString()));
+
+    Container<Record> container = new Container<>();
+    for (Record deleteRecord : updateRecords) {
+      container.set(deleteRecord);
+      testWriter.write(container);
+    }
+
+    testWriter.close(false);
+
+    RowDelta rowDelta = table.newRowDelta();
+    testWriter.files().deleteFiles().forEach(rowDelta::addDeletes);
+    testWriter.files().dataFiles().forEach(rowDelta::addRows);
+    rowDelta.commit();
+  }
+
+  private static List<GenericRecord> updateRecords(Table table, Map<Integer, GenericRecord> updated)
+      throws IOException {
+    List<GenericRecord> updateRecords = Lists.newArrayListWithExpectedSize(updated.size());
+    for (GenericRecord record : readRecords(table, schemaWithMeta(table))) {
+      if (!updated.keySet().contains(record.getField("id"))) {
+        continue;
+      }
+
+      GenericRecord updateRecord = GenericRecord.create(IcebergAcidUtil.createSerdeSchemaForUpdate(SCHEMA.columns()));
+      int specId = (Integer) record.getField(MetadataColumns.SPEC_ID.name());
+      updateRecord.setField(MetadataColumns.SPEC_ID.name(), specId);
+      PartitionKey partitionKey = new PartitionKey(table.specs().get(specId), table.schema());
+      partitionKey.partition(record);
+      updateRecord.setField(MetadataColumns.PARTITION_COLUMN_NAME, partitionKey);
+      updateRecord.setField(MetadataColumns.FILE_PATH.name(), record.getField(MetadataColumns.FILE_PATH.name()));
+      updateRecord.setField(MetadataColumns.ROW_POSITION.name(), record.getField(MetadataColumns.ROW_POSITION.name()));
+
+      SCHEMA.columns().forEach(field -> {
+        updateRecord.setField(field.name(), updated.get(record.getField("id")).getField(field.name()));
+        updateRecord.setField("__old_value_for_" + field.name(), record.getField(field.name()));
+      });
+      updateRecords.add(updateRecord);
+    }
+    return updateRecords;
+  }
+
+  private static GenericRecord record(Integer id, String data) {
+    GenericRecord record = GenericRecord.create(SCHEMA);
+    record.setField("id", id);
+    record.setField("data", data);
+    return record;
+  }
+}
diff --git a/iceberg/pom.xml b/iceberg/pom.xml
index adfea1b5eb..b742f19398 100644
--- a/iceberg/pom.xml
+++ b/iceberg/pom.xml
@@ -182,6 +182,12 @@
         <classifier>tests</classifier>
         <version>${project.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.iceberg</groupId>
+        <artifactId>iceberg-core</artifactId>
+        <classifier>tests</classifier>
+        <version>${iceberg.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.assertj</groupId>
         <artifactId>assertj-core</artifactId>