You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kr...@apache.org on 2022/06/29 05:08:29 UTC

[hive] branch master updated: HIVE-26319: Iceberg integration: Perform update split early (Krisztian Kasa, reviewed by Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new b3f13b5a86 HIVE-26319: Iceberg integration: Perform update split early (Krisztian Kasa, reviewed by Peter Vary)
b3f13b5a86 is described below

commit b3f13b5a8635893f0b8eaf6e2e92a96ce375f9f0
Author: Krisztian Kasa <kk...@cloudera.com>
AuthorDate: Wed Jun 29 07:08:18 2022 +0200

    HIVE-26319: Iceberg integration: Perform update split early (Krisztian Kasa, reviewed by Peter Vary)
---
 .../mr/hive/HiveIcebergOutputCommitter.java        | 196 +++++++++++++++------
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |  81 +++++----
 .../iceberg/mr/hive/writer/WriterRegistry.java     |  15 +-
 .../positive/update_iceberg_partitioned_avro.q.out |  12 +-
 .../positive/update_iceberg_partitioned_orc.q.out  |  12 +-
 .../update_iceberg_partitioned_parquet.q.out       |  12 +-
 .../update_iceberg_unpartitioned_parquet.q.out     |  12 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java      |  13 +-
 .../hive/ql/parse/RewriteSemanticAnalyzer.java     |   3 +
 .../ql/parse/UpdateDeleteSemanticAnalyzer.java     |  90 +++++++---
 .../hadoop/hive/ql/session/SessionStateUtil.java   |  28 ++-
 11 files changed, 331 insertions(+), 143 deletions(-)

diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index 52507cf65c..02ec4a1a91 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -24,7 +24,10 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
@@ -67,6 +70,7 @@ import org.apache.iceberg.mr.hive.writer.WriterRegistry;
 import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.iceberg.util.Tasks;
 import org.slf4j.Logger;
@@ -109,8 +113,8 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
 
     TaskAttemptID attemptID = context.getTaskAttemptID();
     JobConf jobConf = context.getJobConf();
-    Collection<String> outputs = HiveIcebergStorageHandler.outputTables(context.getJobConf());
-    Map<String, HiveIcebergWriter> writers = Optional.ofNullable(WriterRegistry.writers(attemptID))
+    Set<String> outputs = HiveIcebergStorageHandler.outputTables(context.getJobConf());
+    Map<String, List<HiveIcebergWriter>> writers = Optional.ofNullable(WriterRegistry.writers(attemptID))
         .orElseGet(() -> {
           LOG.info("CommitTask found no writers for output tables: {}, attemptID: {}", outputs, attemptID);
           return ImmutableMap.of();
@@ -127,11 +131,12 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
           .run(output -> {
             Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output);
             if (table != null) {
-              HiveIcebergWriter writer = writers.get(output);
               String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf,
                   attemptID.getJobID(), attemptID.getTaskID().getId());
-              if (writer != null) {
-                createFileForCommit(writer.files(), fileForCommitLocation, table.io());
+              if (writers.get(output) != null) {
+                for (HiveIcebergWriter writer : writers.get(output)) {
+                  createFileForCommit(writer.files(), fileForCommitLocation, table.io());
+                }
               } else {
                 LOG.info("CommitTask found no writer for specific table: {}, attemptID: {}", output, attemptID);
                 createFileForCommit(FilesForCommit.empty(), fileForCommitLocation, table.io());
@@ -162,35 +167,87 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
     TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext);
 
     // Clean up writer data from the local store
-    Map<String, HiveIcebergWriter> writers = WriterRegistry.removeWriters(context.getTaskAttemptID());
+    Map<String, List<HiveIcebergWriter>> writerMap = WriterRegistry.removeWriters(context.getTaskAttemptID());
 
     // Remove files if it was not done already
-    if (writers != null) {
-      for (HiveIcebergWriter writer : writers.values()) {
-        writer.close(true);
+    if (writerMap != null) {
+      for (List<HiveIcebergWriter> writerList : writerMap.values()) {
+        for (HiveIcebergWriter writer : writerList) {
+          writer.close(true);
+        }
       }
     }
   }
 
+  @Override
+  public void commitJob(JobContext originalContext) throws IOException {
+    commitJobs(Collections.singletonList(originalContext));
+  }
+
+  /**
+   * Wrapper class for storing output {@link Table} and it's context for committing changes:
+   * JobContext, CommitInfo.
+   */
+  private static class OutputTable {
+    private final String catalogName;
+    private final String tableName;
+    private final Table table;
+    private final JobContext jobContext;
+    private final SessionStateUtil.CommitInfo commitInfo;
+
+    private OutputTable(String catalogName, String tableName, Table table, JobContext jobContext,
+                        SessionStateUtil.CommitInfo commitInfo) {
+      this.catalogName = catalogName;
+      this.tableName = tableName;
+      this.table = table;
+      this.jobContext = jobContext;
+      this.commitInfo = commitInfo;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      OutputTable output1 = (OutputTable) o;
+      return Objects.equals(tableName, output1.tableName) &&
+          Objects.equals(jobContext.getJobID(), output1.jobContext.getJobID());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(tableName, jobContext.getJobID());
+    }
+
+    public Optional<SessionStateUtil.CommitInfo> getCommitInfo() {
+      return Optional.ofNullable(commitInfo);
+    }
+  }
+
   /**
    * Reads the commit files stored in the temp directories and collects the generated committed data files.
    * Appends the data files to the tables. At the end removes the temporary directories.
-   * @param originalContext The job context
+   * @param originalContextList The job context list
    * @throws IOException if there is a failure accessing the files
    */
-  @Override
-  public void commitJob(JobContext originalContext) throws IOException {
-    JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
-    JobConf jobConf = jobContext.getJobConf();
-
+  public void commitJobs(List<JobContext> originalContextList) throws IOException {
+    List<JobContext> jobContextList = originalContextList.stream()
+        .map(TezUtil::enrichContextWithVertexId)
+        .collect(Collectors.toList());
+    Set<OutputTable> outputs = collectOutputs(jobContextList);
     long startTime = System.currentTimeMillis();
-    LOG.info("Committing job {} has started", jobContext.getJobID());
 
-    Collection<String> outputs = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf());
+    String ids = jobContextList.stream()
+        .map(jobContext -> jobContext.getJobID().toString()).collect(Collectors.joining(","));
+    LOG.info("Committing job(s) {} has started", ids);
+
     Collection<String> jobLocations = new ConcurrentLinkedQueue<>();
 
-    ExecutorService fileExecutor = fileExecutor(jobConf);
-    ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
+    ExecutorService fileExecutor = fileExecutor(jobContextList.get(0).getJobConf());
+    ExecutorService tableExecutor = tableExecutor(jobContextList.get(0).getJobConf(), outputs.size());
     try {
       // Commits the changes for the output tables in parallel
       Tasks.foreach(outputs)
@@ -198,17 +255,10 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
           .stopOnFailure()
           .executeWith(tableExecutor)
           .run(output -> {
-            Table table = SessionStateUtil.getResource(jobConf, output)
-                .filter(o -> o instanceof Table).map(o -> (Table) o)
-                // fall back to getting the serialized table from the config
-                .orElseGet(() -> HiveIcebergStorageHandler.table(jobConf, output));
-            if (table != null) {
-              String catalogName = HiveIcebergStorageHandler.catalogName(jobConf, output);
-              jobLocations.add(generateJobLocation(table.location(), jobConf, jobContext.getJobID()));
-              commitTable(table.io(), fileExecutor, jobContext, output, table.location(), catalogName);
-            } else {
-              LOG.info("CommitJob found no table object in QueryState or conf for: {}. Skipping job commit.", output);
-            }
+            JobConf jobConf = output.jobContext.getJobConf();
+            Table table = output.table;
+            jobLocations.add(generateJobLocation(table.location(), jobConf, output.jobContext.getJobID()));
+            commitTable(table.io(), fileExecutor, output);
           });
     } finally {
       fileExecutor.shutdown();
@@ -217,9 +267,37 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
       }
     }
 
-    LOG.info("Commit took {} ms for job {}", System.currentTimeMillis() - startTime, jobContext.getJobID());
+    LOG.info("Commit took {} ms for job(s) {}", System.currentTimeMillis() - startTime, ids);
 
-    cleanup(jobContext, jobLocations);
+    for (JobContext jobContext : jobContextList) {
+      cleanup(jobContext, jobLocations);
+    }
+  }
+
+  private Set<OutputTable> collectOutputs(List<JobContext> jobContextList) {
+    Set<OutputTable> outputs = Sets.newHashSet();
+    for (JobContext jobContext : jobContextList) {
+      Set<String> outputNames = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf());
+      for (String output : outputNames) {
+        Table table = SessionStateUtil.getResource(jobContext.getJobConf(), output)
+            .filter(o -> o instanceof Table).map(o -> (Table) o)
+            // fall back to getting the serialized table from the config
+            .orElseGet(() -> HiveIcebergStorageHandler.table(jobContext.getJobConf(), output));
+        if (table == null) {
+          LOG.info("CommitJob found no table object in QueryState or conf for: {}. Skipping job commit.", output);
+          continue;
+        }
+
+        SessionStateUtil.CommitInfo commitInfo = null;
+        if (SessionStateUtil.getCommitInfo(jobContext.getJobConf(), output).isPresent()) {
+          commitInfo = SessionStateUtil.getCommitInfo(jobContext.getJobConf(), output)
+              .get().get(jobContext.getJobID().toString());
+        }
+        String catalogName = HiveIcebergStorageHandler.catalogName(jobContext.getJobConf(), output);
+        outputs.add(new OutputTable(catalogName, output, table, jobContext, commitInfo));
+      }
+    }
+    return outputs;
   }
 
   /**
@@ -231,15 +309,23 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
    */
   @Override
   public void abortJob(JobContext originalContext, int status) throws IOException {
-    JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
-    JobConf jobConf = jobContext.getJobConf();
+    abortJobs(Collections.singletonList(originalContext));
+  }
+
+  public void abortJobs(List<JobContext> originalContextList) throws IOException {
+    List<JobContext> jobContextList = originalContextList.stream()
+        .map(TezUtil::enrichContextWithVertexId)
+        .collect(Collectors.toList());
+    Set<OutputTable> outputs = collectOutputs(jobContextList);
+
+    String ids = jobContextList.stream()
+        .map(jobContext -> jobContext.getJobID().toString()).collect(Collectors.joining(","));
+    LOG.info("Job(s) {} are aborted. Data file cleaning started", ids);
 
-    LOG.info("Job {} is aborted. Data file cleaning started", jobContext.getJobID());
-    Collection<String> outputs = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf());
     Collection<String> jobLocations = new ConcurrentLinkedQueue<>();
 
-    ExecutorService fileExecutor = fileExecutor(jobConf);
-    ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
+    ExecutorService fileExecutor = fileExecutor(jobContextList.get(0).getJobConf());
+    ExecutorService tableExecutor = tableExecutor(jobContextList.get(0).getJobConf(), outputs.size());
     try {
       // Cleans up the changes for the output tables in parallel
       Tasks.foreach(outputs)
@@ -247,9 +333,11 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
           .executeWith(tableExecutor)
           .onFailure((output, exc) -> LOG.warn("Failed cleanup table {} on abort job", output, exc))
           .run(output -> {
+            JobContext jobContext = output.jobContext;
+            JobConf jobConf = jobContext.getJobConf();
             LOG.info("Cleaning job for jobID: {}, table: {}", jobContext.getJobID(), output);
 
-            Table table = HiveIcebergStorageHandler.table(jobConf, output);
+            Table table = output.table;
             String jobLocation = generateJobLocation(table.location(), jobConf, jobContext.getJobID());
             jobLocations.add(jobLocation);
             // list jobLocation to get number of forCommit files
@@ -277,9 +365,11 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
       }
     }
 
-    LOG.info("Job {} is aborted. Data file cleaning finished", jobContext.getJobID());
+    LOG.info("Job(s) {} are aborted. Data file cleaning finished", ids);
 
-    cleanup(jobContext, jobLocations);
+    for (JobContext jobContext : jobContextList) {
+      cleanup(jobContext, jobLocations);
+    }
   }
 
   /**
@@ -305,27 +395,26 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
    * Collects the additions to a single table and adds/commits the new files to the Iceberg table.
    * @param io The io to read the forCommit files
    * @param executor The executor used to read the forCommit files
-   * @param jobContext The job context
-   * @param name The name of the table used for loading from the catalog
-   * @param location The location of the table used for loading from the catalog
-   * @param catalogName The name of the catalog that contains the table
+   * @param outputTable The table used for loading from the catalog
    */
-  private void commitTable(FileIO io, ExecutorService executor, JobContext jobContext, String name, String location,
-                           String catalogName) {
+  private void commitTable(FileIO io, ExecutorService executor, OutputTable outputTable) {
+    String name = outputTable.tableName;
+    JobContext jobContext = outputTable.jobContext;
     JobConf conf = jobContext.getJobConf();
     Properties catalogProperties = new Properties();
     catalogProperties.put(Catalogs.NAME, name);
-    catalogProperties.put(Catalogs.LOCATION, location);
-    if (catalogName != null) {
-      catalogProperties.put(InputFormatConfig.CATALOG_NAME, catalogName);
+    catalogProperties.put(Catalogs.LOCATION, outputTable.table.location());
+    if (outputTable.catalogName != null) {
+      catalogProperties.put(InputFormatConfig.CATALOG_NAME, outputTable.catalogName);
     }
     Table table = Catalogs.loadTable(conf, catalogProperties);
 
     long startTime = System.currentTimeMillis();
     LOG.info("Committing job has started for table: {}, using location: {}",
-        table, generateJobLocation(location, conf, jobContext.getJobID()));
+        table, generateJobLocation(outputTable.table.location(), conf, jobContext.getJobID()));
 
-    int numTasks = SessionStateUtil.getCommitInfo(conf, name).map(info -> info.getTaskNum()).orElseGet(() -> {
+    Optional<SessionStateUtil.CommitInfo> commitInfo = outputTable.getCommitInfo();
+    int numTasks = commitInfo.map(SessionStateUtil.CommitInfo::getTaskNum).orElseGet(() -> {
       // Fallback logic, if number of tasks are not available in the config
       // If there are reducers, then every reducer will generate a result file.
       // If this is a map only task, then every mapper will generate a result file.
@@ -334,7 +423,8 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
       return conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks();
     });
 
-    FilesForCommit writeResults = collectResults(numTasks, executor, location, jobContext, io, true);
+    FilesForCommit writeResults = collectResults(
+        numTasks, executor, outputTable.table.location(), jobContext, io, true);
     if (!conf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) {
       if (writeResults.isEmpty()) {
         LOG.info(
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index dcec3468ad..5f1c9158aa 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -24,11 +24,13 @@ import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -77,8 +79,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.JobContextImpl;
 import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.JobStatus;
-import org.apache.hadoop.mapred.OutputCommitter;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.FileFormat;
@@ -100,6 +100,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Throwables;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.SerializationUtil;
 import org.slf4j.Logger;
@@ -411,24 +412,28 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
   public void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException {
     String tableName = commitProperties.getProperty(Catalogs.NAME);
     Configuration configuration = SessionState.getSessionConf();
-    Optional<JobContext> jobContext = generateJobContext(configuration, tableName, overwrite);
-    if (jobContext.isPresent()) {
-      OutputCommitter committer = new HiveIcebergOutputCommitter();
+    List<JobContext> jobContextList = generateJobContext(configuration, tableName, overwrite);
+    if (jobContextList.isEmpty()) {
+      return;
+    }
+
+    HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
+    try {
+      committer.commitJobs(jobContextList);
+    } catch (Throwable e) {
+      String ids = jobContextList
+          .stream().map(jobContext -> jobContext.getJobID().toString()).collect(Collectors.joining(", "));
+      // Aborting the job if the commit has failed
+      LOG.error("Error while trying to commit job: {}, starting rollback changes for table: {}",
+          ids, tableName, e);
       try {
-        committer.commitJob(jobContext.get());
-      } catch (Throwable e) {
-        // Aborting the job if the commit has failed
-        LOG.error("Error while trying to commit job: {}, starting rollback changes for table: {}",
-            jobContext.get().getJobID(), tableName, e);
-        try {
-          committer.abortJob(jobContext.get(), JobStatus.State.FAILED);
-        } catch (IOException ioe) {
-          LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", ioe);
-          // no throwing here because the original exception should be propagated
-        }
-        throw new HiveException(
-            "Error committing job: " + jobContext.get().getJobID() + " for table: " + tableName, e);
+        committer.abortJobs(jobContextList);
+      } catch (IOException ioe) {
+        LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", ioe);
+        // no throwing here because the original exception should be propagated
       }
+      throw new HiveException(
+          "Error committing job: " + ids + " for table: " + tableName, e);
     }
   }
 
@@ -653,8 +658,8 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
    * @param config The configuration used to get the data from
    * @return The collection of the table names as returned by TableDesc.getTableName()
    */
-  public static Collection<String> outputTables(Configuration config) {
-    return TABLE_NAME_SPLITTER.splitToList(config.get(InputFormatConfig.OUTPUT_TABLES));
+  public static Set<String> outputTables(Configuration config) {
+    return Sets.newHashSet(TABLE_NAME_SPLITTER.split(config.get(InputFormatConfig.OUTPUT_TABLES)));
   }
 
   /**
@@ -859,29 +864,35 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
   }
 
   /**
-   * Generates a JobContext for the OutputCommitter for the specific table.
+   * Generates {@link JobContext}s for the OutputCommitter for the specific table.
    * @param configuration The configuration used for as a base of the JobConf
    * @param tableName The name of the table we are planning to commit
    * @param overwrite If we have to overwrite the existing table or just add the new data
-   * @return The generated JobContext
+   * @return The generated Optional JobContext list or empty if not presents.
    */
-  private Optional<JobContext> generateJobContext(Configuration configuration, String tableName, boolean overwrite) {
+  private List<JobContext> generateJobContext(Configuration configuration, String tableName,
+      boolean overwrite) {
     JobConf jobConf = new JobConf(configuration);
-    Optional<SessionStateUtil.CommitInfo> commitInfo = SessionStateUtil.getCommitInfo(jobConf, tableName);
-    if (commitInfo.isPresent()) {
-      JobID jobID = JobID.forName(commitInfo.get().getJobIdStr());
-      commitInfo.get().getProps().forEach(jobConf::set);
-      jobConf.setBoolean(InputFormatConfig.IS_OVERWRITE, overwrite);
-
-      // we should only commit this current table because
-      // for multi-table inserts, this hook method will be called sequentially for each target table
-      jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName);
-
-      return Optional.of(new JobContextImpl(jobConf, jobID, null));
+    Optional<Map<String, SessionStateUtil.CommitInfo>> commitInfoMap =
+        SessionStateUtil.getCommitInfo(jobConf, tableName);
+    if (commitInfoMap.isPresent()) {
+      List<JobContext> jobContextList = Lists.newLinkedList();
+      for (SessionStateUtil.CommitInfo commitInfo : commitInfoMap.get().values()) {
+        JobID jobID = JobID.forName(commitInfo.getJobIdStr());
+        commitInfo.getProps().forEach(jobConf::set);
+        jobConf.setBoolean(InputFormatConfig.IS_OVERWRITE, overwrite);
+
+        // we should only commit this current table because
+        // for multi-table inserts, this hook method will be called sequentially for each target table
+        jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName);
+
+        jobContextList.add(new JobContextImpl(jobConf, jobID, null));
+      }
+      return jobContextList;
     } else {
       // most likely empty write scenario
       LOG.debug("Unable to find commit information in query state for table: {}", tableName);
-      return Optional.empty();
+      return Collections.emptyList();
     }
   }
 
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterRegistry.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterRegistry.java
index dba4973e41..4d7708758f 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterRegistry.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterRegistry.java
@@ -19,26 +19,33 @@
 
 package org.apache.iceberg.mr.hive.writer;
 
+import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 
 public class WriterRegistry {
-  private static final Map<TaskAttemptID, Map<String, HiveIcebergWriter>> writers = Maps.newConcurrentMap();
+  private static final Map<TaskAttemptID, Map<String, List<HiveIcebergWriter>>> writers = Maps.newConcurrentMap();
 
   private WriterRegistry() {
   }
 
-  public static Map<String, HiveIcebergWriter> removeWriters(TaskAttemptID taskAttemptID) {
+  public static Map<String, List<HiveIcebergWriter>> removeWriters(TaskAttemptID taskAttemptID) {
     return writers.remove(taskAttemptID);
   }
 
   public static void registerWriter(TaskAttemptID taskAttemptID, String tableName, HiveIcebergWriter writer) {
     writers.putIfAbsent(taskAttemptID, Maps.newConcurrentMap());
-    writers.get(taskAttemptID).put(tableName, writer);
+
+    Map<String, List<HiveIcebergWriter>> writersOfTableMap = writers.get(taskAttemptID);
+    writersOfTableMap.putIfAbsent(tableName, Lists.newArrayList());
+
+    List<HiveIcebergWriter> writerList = writersOfTableMap.get(tableName);
+    writerList.add(writer);
   }
 
-  public static Map<String, HiveIcebergWriter> writers(TaskAttemptID taskAttemptID) {
+  public static Map<String, List<HiveIcebergWriter>> writers(TaskAttemptID taskAttemptID) {
     return writers.get(taskAttemptID);
   }
 }
diff --git a/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out
index 351011cb47..6782a0771a 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out
@@ -22,10 +22,12 @@ PREHOOK: query: update tbl_ice set b='Changes' where b in ('one', 'four') or a =
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
 POSTHOOK: query: update tbl_ice set b='Changes' where b in ('one', 'four') or a = 22
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
 PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
@@ -49,16 +51,18 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola', 800), (555, 'schola',
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1, $hdt$_2, $hdt$_3]] in Stage 'Reducer 4' is a cross product
+Warning: Shuffle Join MERGEJOIN[58][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[60][tables = [$hdt$_0, $hdt$_1, $hdt$_2, $hdt$_3]] in Stage 'Reducer 4' is a cross product
 PREHOOK: query: update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
 POSTHOOK: query: update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
 PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
@@ -101,11 +105,13 @@ PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Input: default@tbl_ice_other
 PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
 POSTHOOK: query: update tbl_ice set b='Changed forever' where a in (select t1.a from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Input: default@tbl_ice_other
 POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
 PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
@@ -150,11 +156,13 @@ PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Input: default@tbl_standard_other
 PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
 POSTHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Input: default@tbl_standard_other
 POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
 PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
diff --git a/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_orc.q.out b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_orc.q.out
index 4fbe5c0500..c669b7dba6 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_orc.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_orc.q.out
@@ -22,10 +22,12 @@ PREHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or a =
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
 POSTHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or a = 22
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
 PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
@@ -49,16 +51,18 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola', 800), (555, 'schola',
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1, $hdt$_2, $hdt$_3]] in Stage 'Reducer 4' is a cross product
+Warning: Shuffle Join MERGEJOIN[58][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[60][tables = [$hdt$_0, $hdt$_1, $hdt$_2, $hdt$_3]] in Stage 'Reducer 4' is a cross product
 PREHOOK: query: update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
 POSTHOOK: query: update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
 PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
@@ -101,11 +105,13 @@ PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Input: default@tbl_ice_other
 PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
 POSTHOOK: query: update tbl_ice set b='Changed forever' where a in (select t1.a from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Input: default@tbl_ice_other
 POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
 PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
@@ -150,11 +156,13 @@ PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Input: default@tbl_standard_other
 PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
 POSTHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Input: default@tbl_standard_other
 POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
 PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
diff --git a/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_parquet.q.out b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_parquet.q.out
index 46913856ed..317e2b51e2 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_parquet.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_parquet.q.out
@@ -22,10 +22,12 @@ PREHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or a =
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
 POSTHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or a = 22
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
 PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
@@ -49,16 +51,18 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola', 800), (555, 'schola',
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1, $hdt$_2, $hdt$_3]] in Stage 'Reducer 4' is a cross product
+Warning: Shuffle Join MERGEJOIN[58][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[60][tables = [$hdt$_0, $hdt$_1, $hdt$_2, $hdt$_3]] in Stage 'Reducer 4' is a cross product
 PREHOOK: query: update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
 POSTHOOK: query: update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
 PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
@@ -101,11 +105,13 @@ PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Input: default@tbl_ice_other
 PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
 POSTHOOK: query: update tbl_ice set b='Changed forever' where a in (select t1.a from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Input: default@tbl_ice_other
 POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
 PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
@@ -150,11 +156,13 @@ PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Input: default@tbl_standard_other
 PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
 POSTHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Input: default@tbl_standard_other
 POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
 PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
diff --git a/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_unpartitioned_parquet.q.out b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_unpartitioned_parquet.q.out
index 1b0d9d5bcd..c1f4775c22 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_unpartitioned_parquet.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_unpartitioned_parquet.q.out
@@ -22,10 +22,12 @@ PREHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or a =
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
 POSTHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or a = 22
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
 PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
@@ -49,16 +51,18 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola', 800), (555, 'schola',
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[51][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1, $hdt$_2, $hdt$_3]] in Stage 'Reducer 4' is a cross product
+Warning: Shuffle Join MERGEJOIN[56][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[58][tables = [$hdt$_0, $hdt$_1, $hdt$_2, $hdt$_3]] in Stage 'Reducer 4' is a cross product
 PREHOOK: query: update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
 POSTHOOK: query: update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
 PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
@@ -101,11 +105,13 @@ PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Input: default@tbl_ice_other
 PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
 POSTHOOK: query: update tbl_ice set b='Changed forever' where a in (select t1.a from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Input: default@tbl_ice_other
 POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
 PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
@@ -150,11 +156,13 @@ PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
 PREHOOK: Input: default@tbl_standard_other
 PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
 POSTHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_ice
 POSTHOOK: Input: default@tbl_standard_other
 POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
 PREHOOK: query: select * from tbl_ice order by a, b, c
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_ice
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 0d95b62e29..c56c37dd86 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -617,18 +617,13 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       initializeSpecPath();
       fs = specPath.getFileSystem(hconf);
 
-      setWriteOperation(hconf, getConf().getTableInfo().getTableName(), getConf().getWriteOperation());
-      if (hconf instanceof JobConf) {
-        jc = (JobConf) hconf;
-      } else {
-        // test code path
-        jc = new JobConf(hconf);
-      }
+      jc = new JobConf(hconf);
+      setWriteOperation(jc, getConf().getTableInfo().getTableName(), getConf().getWriteOperation());
 
       try {
         createHiveOutputFormat(jc);
       } catch (HiveException ex) {
-        logOutputFormatError(hconf, ex);
+        logOutputFormatError(jc, ex);
         throw ex;
       }
       isCompressed = conf.getCompressed();
@@ -639,7 +634,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       }
       statsFromRecordWriter = new boolean[numFiles];
       AbstractSerDe serde = conf.getTableInfo().getSerDeClass().newInstance();
-      serde.initialize(unsetNestedColumnPaths(hconf), conf.getTableInfo().getProperties(), null);
+      serde.initialize(unsetNestedColumnPaths(jc), conf.getTableInfo().getProperties(), null);
 
       serializer = serde;
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
index cd7de19b12..f934990b13 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
@@ -555,6 +555,9 @@ public abstract class RewriteSemanticAnalyzer extends CalcitePlanner {
   }
 
   protected void appendSortBy(StringBuilder rewrittenQueryStr, List<String> keys) {
+    if (keys.isEmpty()) {
+      return;
+    }
     rewrittenQueryStr.append(INDENT).append("SORT BY ");
     rewrittenQueryStr.append(StringUtils.join(keys, ","));
     rewrittenQueryStr.append("\n");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
index f83f36d71a..deca32eb42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -30,13 +29,13 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.session.SessionStateUtil;
+
+import static java.util.Collections.singletonList;
 
 /**
  * A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles
@@ -79,7 +78,7 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
     operation = Context.Operation.UPDATE;
     boolean nonNativeAcid = AcidUtils.isNonNativeAcidTable(mTable);
 
-    if (HiveConf.getBoolVar(queryState.getConf(), HiveConf.ConfVars.SPLIT_UPDATE) && !nonNativeAcid) {
+    if (HiveConf.getBoolVar(queryState.getConf(), HiveConf.ConfVars.SPLIT_UPDATE)) {
       analyzeSplitUpdate(tree, mTable, tabNameNode);
     } else {
       reparseAndSuperAnalyze(tree, mTable, tabNameNode);
@@ -113,9 +112,6 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
   private void reparseAndSuperAnalyze(ASTNode tree, Table mTable, ASTNode tabNameNode) throws SemanticException {
     List<? extends Node> children = tree.getChildren();
 
-    // save the operation type into the query state
-    SessionStateUtil.addResource(conf, Context.Operation.class.getSimpleName(), operation.name());
-
     StringBuilder rewrittenQueryStr = new StringBuilder();
     rewrittenQueryStr.append("insert into table ");
     rewrittenQueryStr.append(getFullTableNameForSQL(tabNameNode));
@@ -257,6 +253,9 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
     }
   }
 
+  public static final String DELETE_PREFIX = "__d__";
+  public static final String SUB_QUERY_ALIAS = "s";
+
   private void analyzeSplitUpdate(ASTNode tree, Table mTable, ASTNode tabNameNode) throws SemanticException {
     operation = Context.Operation.UPDATE;
 
@@ -282,11 +281,45 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
 
     List<FieldSchema> nonPartCols = mTable.getCols();
     Map<String, String> colNameToDefaultConstraint = getColNameToDefaultValueMap(mTable);
-    List<String> values = new ArrayList<>(mTable.getCols().size());
     StringBuilder rewrittenQueryStr = createRewrittenQueryStrBuilder();
-    rewrittenQueryStr.append("(SELECT ROW__ID");
+    rewrittenQueryStr.append("(SELECT ");
+
+    boolean nonNativeAcid = AcidUtils.isNonNativeAcidTable(mTable);
+    int columnOffset;
+    List<String> deleteValues;
+    if (nonNativeAcid) {
+      List<FieldSchema> acidSelectColumns = mTable.getStorageHandler().acidSelectColumns(mTable, operation);
+      deleteValues = new ArrayList<>(acidSelectColumns.size());
+      for (FieldSchema fieldSchema : acidSelectColumns) {
+        String identifier = HiveUtils.unparseIdentifier(fieldSchema.getName(), this.conf);
+        rewrittenQueryStr.append(identifier).append(" AS ");
+        String prefixedIdentifier = HiveUtils.unparseIdentifier(DELETE_PREFIX + fieldSchema.getName(), this.conf);
+        rewrittenQueryStr.append(prefixedIdentifier);
+        rewrittenQueryStr.append(",");
+        deleteValues.add(String.format("%s.%s", SUB_QUERY_ALIAS, prefixedIdentifier));
+      }
+
+      columnOffset = acidSelectColumns.size();
+    } else {
+      rewrittenQueryStr.append("ROW__ID,");
+      deleteValues = new ArrayList<>(1 + mTable.getPartCols().size());
+      deleteValues.add(SUB_QUERY_ALIAS + ".ROW__ID");
+      for (FieldSchema fieldSchema : mTable.getPartCols()) {
+        deleteValues.add(SUB_QUERY_ALIAS + "." + HiveUtils.unparseIdentifier(fieldSchema.getName(), conf));
+      }
+      columnOffset = 1;
+    }
+
+    List<String> insertValues = new ArrayList<>(mTable.getCols().size());
+    boolean first = true;
+
     for (int i = 0; i < nonPartCols.size(); i++) {
-      rewrittenQueryStr.append(',');
+      if (first) {
+        first = false;
+      } else {
+        rewrittenQueryStr.append(",");
+      }
+
       String name = nonPartCols.get(i).getName();
       ASTNode setCol = setCols.get(name);
       String identifier = HiveUtils.unparseIdentifier(name, this.conf);
@@ -299,7 +332,7 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
           rewrittenQueryStr.append(identifier);
           // This is one of the columns we're setting, record it's position so we can come back
           // later and patch it up. 0th is ROW_ID
-          setColExprs.put(i + 1, setCol);
+          setColExprs.put(i + columnOffset, setCol);
         }
       } else {
         rewrittenQueryStr.append(identifier);
@@ -307,16 +340,28 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
       rewrittenQueryStr.append(" AS ");
       rewrittenQueryStr.append(identifier);
 
-      values.add("s." + identifier);
+      insertValues.add(SUB_QUERY_ALIAS + "." + identifier);
     }
     addPartitionColsToSelect(mTable.getPartCols(), rewrittenQueryStr);
-    addPartitionColsAsValues(mTable.getPartCols(), "s", values);
-    rewrittenQueryStr.append(" FROM ").append(getFullTableNameForSQL(tabNameNode)).append(") s\n");
+    addPartitionColsAsValues(mTable.getPartCols(), SUB_QUERY_ALIAS, insertValues);
+    rewrittenQueryStr.append(" FROM ").append(getFullTableNameForSQL(tabNameNode)).append(") ");
+    rewrittenQueryStr.append(SUB_QUERY_ALIAS).append("\n");
 
-    appendInsertBranch(rewrittenQueryStr, null, values);
-    appendDeleteBranch(rewrittenQueryStr, null, "s", Collections.singletonList("s.ROW__ID "));
+    appendInsertBranch(rewrittenQueryStr, null, insertValues);
+    appendInsertBranch(rewrittenQueryStr, null, deleteValues);
 
-    appendSortBy(rewrittenQueryStr, Collections.singletonList("s.ROW__ID "));
+    List<String> sortKeys;
+    if (nonNativeAcid) {
+      sortKeys = mTable.getStorageHandler().acidSortColumns(mTable, Context.Operation.DELETE).stream()
+              .map(fieldSchema -> String.format(
+                      "%s.%s",
+                      SUB_QUERY_ALIAS,
+                      HiveUtils.unparseIdentifier(DELETE_PREFIX + fieldSchema.getName(), this.conf)))
+              .collect(Collectors.toList());
+    } else {
+      sortKeys = singletonList(SUB_QUERY_ALIAS + ".ROW__ID ");
+    }
+    appendSortBy(rewrittenQueryStr, sortKeys);
 
     ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd());
     Context rewrittenCtx = rr.rewrittenCtx;
@@ -335,14 +380,9 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
 
     patchProjectionForUpdate(rewrittenInsert, setColExprs);
 
-    try {
-      useSuper = true;
-      // Note: this will overwrite this.ctx with rewrittenCtx
-      rewrittenCtx.setEnableUnparse(false);
-      super.analyze(rewrittenTree, rewrittenCtx);
-    } finally {
-      useSuper = false;
-    }
+    // Note: this will overwrite this.ctx with rewrittenCtx
+    rewrittenCtx.setEnableUnparse(false);
+    analyzeRewrittenTree(rewrittenTree, rewrittenCtx);
 
     updateOutputs(mTable);
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionStateUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionStateUtil.java
index 2cfe9d1e79..c400ace122 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionStateUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionStateUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.session;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.hadoop.conf.Configuration;
@@ -74,12 +75,10 @@ public class SessionStateUtil {
   /**
    * @param conf Configuration object used for getting the query state, should contain the query id
    * @param tableName Name of the table for which the commit info should be retrieved
-   * @return the CommitInfo, or empty Optional if not present
+   * @return the CommitInfo map. Key: jobId, Value: {@link CommitInfo}, or empty Optional if not present
    */
-  public static Optional<CommitInfo> getCommitInfo(Configuration conf, String tableName) {
-    return getResource(conf, COMMIT_INFO_PREFIX + tableName)
-        .filter(o -> o instanceof CommitInfo)
-        .map(o -> (CommitInfo) o);
+  public static Optional<Map<String, CommitInfo>> getCommitInfo(Configuration conf, String tableName) {
+    return getResource(conf, COMMIT_INFO_PREFIX + tableName).map(o -> (Map<String, CommitInfo>)o);
   }
 
   /**
@@ -92,11 +91,22 @@ public class SessionStateUtil {
    */
   public static boolean addCommitInfo(Configuration conf, String tableName, String jobId, int taskNum,
                                          Map<String, String> additionalProps) {
+
     CommitInfo commitInfo = new CommitInfo()
-        .withJobID(jobId)
-        .withTaskNum(taskNum)
-        .withProps(additionalProps);
-    return addResource(conf, COMMIT_INFO_PREFIX + tableName, commitInfo);
+            .withJobID(jobId)
+            .withTaskNum(taskNum)
+            .withProps(additionalProps);
+
+    Optional<Map<String, CommitInfo>> commitInfoMap = getCommitInfo(conf, tableName);
+    if (commitInfoMap.isPresent()) {
+      commitInfoMap.get().put(jobId, commitInfo);
+      return true;
+    }
+
+    Map<String, CommitInfo> newCommitInfoMap = new HashMap<>();
+    newCommitInfoMap.put(jobId, commitInfo);
+
+    return addResource(conf, COMMIT_INFO_PREFIX + tableName, newCommitInfoMap);
   }
 
   private static Optional<QueryState> getQueryState(Configuration conf) {