You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2021/03/30 12:07:38 UTC

[iceberg] branch master updated: Hive: Implement multi-table inserts (#2228)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d151034  Hive: Implement multi-table inserts (#2228)
d151034 is described below

commit d1510340eaff68d88a2e8194d58e7e493af02bcc
Author: pvary <pv...@cloudera.com>
AuthorDate: Tue Mar 30 14:07:23 2021 +0200

    Hive: Implement multi-table inserts (#2228)
---
 .../org/apache/iceberg/mr/InputFormatConfig.java   |   9 +-
 .../mr/hive/HiveIcebergOutputCommitter.java        | 316 ++++++++++++++-------
 .../iceberg/mr/hive/HiveIcebergOutputFormat.java   |   8 +-
 .../iceberg/mr/hive/HiveIcebergRecordWriter.java   |  15 +-
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |  30 +-
 .../iceberg/mr/mapreduce/IcebergInputFormat.java   |  11 +-
 .../iceberg/mr/hive/HiveIcebergTestUtils.java      |   3 +-
 .../mr/hive/TestHiveIcebergOutputCommitter.java    |  39 +--
 .../TestHiveIcebergStorageHandlerWithEngine.java   |  41 +++
 9 files changed, 331 insertions(+), 141 deletions(-)

diff --git a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
index e39ba3c..38e6606 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
@@ -46,7 +46,7 @@ public class InputFormatConfig {
   public static final String TABLE_LOCATION = "iceberg.mr.table.location";
   public static final String TABLE_SCHEMA = "iceberg.mr.table.schema";
   public static final String PARTITION_SPEC = "iceberg.mr.table.partition.spec";
-  public static final String SERIALIZED_TABLE = "iceberg.mr.serialized.table";
+  public static final String SERIALIZED_TABLE_PREFIX = "iceberg.mr.serialized.table.";
   public static final String LOCALITY = "iceberg.mr.locality";
   public static final String CATALOG = "iceberg.mr.catalog";
   public static final String HADOOP_CATALOG_WAREHOUSE_LOCATION = "iceberg.mr.catalog.hadoop.warehouse.location";
@@ -54,8 +54,11 @@ public class InputFormatConfig {
   public static final String SELECTED_COLUMNS = "iceberg.mr.selected.columns";
   public static final String EXTERNAL_TABLE_PURGE = "external.table.purge";
 
-  public static final String COMMIT_THREAD_POOL_SIZE = "iceberg.mr.commit.thread.pool.size";
-  public static final int COMMIT_THREAD_POOL_SIZE_DEFAULT = 10;
+  public static final String OUTPUT_TABLES = "iceberg.mr.output.tables";
+  public static final String COMMIT_TABLE_THREAD_POOL_SIZE = "iceberg.mr.commit.table.thread.pool.size";
+  public static final int COMMIT_TABLE_THREAD_POOL_SIZE_DEFAULT = 10;
+  public static final String COMMIT_FILE_THREAD_POOL_SIZE = "iceberg.mr.commit.file.thread.pool.size";
+  public static final int COMMIT_FILE_THREAD_POOL_SIZE_DEFAULT = 10;
   public static final String WRITE_TARGET_FILE_SIZE = "iceberg.mr.write.target.file.size";
 
   public static final String CASE_SENSITIVE = "iceberg.mr.case.sensitive";
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index d55a142..3479b36 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -22,10 +22,11 @@ package org.apache.iceberg.mr.hive;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.hadoop.conf.Configuration;
@@ -90,22 +91,36 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
     TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext);
 
     TaskAttemptID attemptID = context.getTaskAttemptID();
-    String fileForCommitLocation = generateFileForCommitLocation(context.getJobConf(),
-        attemptID.getJobID(), attemptID.getTaskID().getId());
-    HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.getWriter(attemptID);
+    JobConf jobConf = context.getJobConf();
+    Map<String, HiveIcebergRecordWriter> writers = HiveIcebergRecordWriter.getWriters(attemptID);
+    Collection<String> outputs = HiveIcebergStorageHandler.outputTables(context.getJobConf());
 
-    DataFile[] closedFiles;
-    if (writer != null) {
-      closedFiles = writer.dataFiles();
-    } else {
-      closedFiles = new DataFile[0];
+    ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
+    try {
+      // Generates commit files for the target tables in parallel
+      Tasks.foreach(outputs)
+          .retry(3)
+          .stopOnFailure()
+          .throwFailureWhenFinished()
+          .executeWith(tableExecutor)
+          .run(output -> {
+            Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output);
+            HiveIcebergRecordWriter writer = writers.get(output);
+            DataFile[] closedFiles = writer != null ? writer.dataFiles() : new DataFile[0];
+            String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf,
+                attemptID.getJobID(), attemptID.getTaskID().getId());
+
+            // Creating the file containing the data files generated by this task for this table
+            createFileForCommit(closedFiles, fileForCommitLocation, table.io());
+          }, IOException.class);
+    } finally {
+      if (tableExecutor != null) {
+        tableExecutor.shutdown();
+      }
     }
 
-    // Creating the file containing the data files generated by this task
-    createFileForCommit(closedFiles, fileForCommitLocation, HiveIcebergStorageHandler.table(context.getJobConf()).io());
-
     // remove the writer to release the object
-    HiveIcebergRecordWriter.removeWriter(attemptID);
+    HiveIcebergRecordWriter.removeWriters(attemptID);
   }
 
   /**
@@ -118,52 +133,61 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
     TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext);
 
     // Clean up writer data from the local store
-    HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(context.getTaskAttemptID());
+    Map<String, HiveIcebergRecordWriter> writers = HiveIcebergRecordWriter.removeWriters(context.getTaskAttemptID());
 
     // Remove files if it was not done already
-    if (writer != null) {
-      writer.close(true);
+    if (writers != null) {
+      for (HiveIcebergRecordWriter writer : writers.values()) {
+        writer.close(true);
+      }
     }
   }
 
   /**
-   * Reads the commit files stored in the temp directory and collects the generated committed data files.
-   * Appends the data files to the table. At the end removes the temporary directory.
+   * Reads the commit files stored in the temp directories and collects the generated committed data files.
+   * Appends the data files to the tables. At the end removes the temporary directories.
    * @param originalContext The job context
-   * @throws IOException if there is a failure deleting the files
+   * @throws IOException if there is a failure accessing the files
    */
   @Override
   public void commitJob(JobContext originalContext) throws IOException {
     JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
-
-    JobConf conf = jobContext.getJobConf();
-    Table table = Catalogs.loadTable(conf);
+    JobConf jobConf = jobContext.getJobConf();
 
     long startTime = System.currentTimeMillis();
-    LOG.info("Committing job has started for table: {}, using location: {}", table,
-        generateJobLocation(conf, jobContext.getJobID()));
+    LOG.info("Committing job {} has started", jobContext.getJobID());
 
-    FileIO io = HiveIcebergStorageHandler.table(jobContext.getJobConf()).io();
-    List<DataFile> dataFiles = dataFiles(jobContext, io, true);
+    Collection<String> outputs = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf());
+    Collection<String> jobLocations = new ConcurrentLinkedQueue<>();
 
-    if (dataFiles.size() > 0) {
-      // Appending data files to the table
-      AppendFiles append = table.newAppend();
-      dataFiles.forEach(append::appendFile);
-      append.commit();
-      LOG.info("Commit took {} ms for table: {} with {} file(s)", System.currentTimeMillis() - startTime, table,
-          dataFiles.size());
-      LOG.debug("Added files {}", dataFiles);
-    } else {
-      LOG.info("Commit took {} ms for table: {} with no new files", System.currentTimeMillis() - startTime, table);
+    ExecutorService fileExecutor = fileExecutor(jobConf);
+    ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
+    try {
+      // Commits the changes for the output tables in parallel
+      Tasks.foreach(outputs)
+          .throwFailureWhenFinished()
+          .stopOnFailure()
+          .executeWith(tableExecutor)
+          .run(output -> {
+            Table table = HiveIcebergStorageHandler.table(jobConf, output);
+            jobLocations.add(generateJobLocation(table.location(), jobConf, jobContext.getJobID()));
+            commitTable(table.io(), fileExecutor, jobContext, output, table.location());
+          });
+    } finally {
+      fileExecutor.shutdown();
+      if (tableExecutor != null) {
+        tableExecutor.shutdown();
+      }
     }
 
-    cleanup(jobContext);
+    LOG.info("Commit took {} ms for job {}", System.currentTimeMillis() - startTime, jobContext.getJobID());
+
+    cleanup(jobContext, jobLocations);
   }
 
   /**
-   * Removes the generated data files, if there is a commit file already generated for them.
-   * The cleanup at the end removes the temporary directory as well.
+   * Removes the generated data files if there is a commit file already generated for them.
+   * The cleanup at the end removes the temporary directories as well.
    * @param originalContext The job context
    * @param status The status of the job
    * @throws IOException if there is a failure deleting the files
@@ -171,117 +195,207 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
   @Override
   public void abortJob(JobContext originalContext, int status) throws IOException {
     JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
+    JobConf jobConf = jobContext.getJobConf();
 
-    String location = generateJobLocation(jobContext.getJobConf(), jobContext.getJobID());
-    LOG.info("Job {} is aborted. Cleaning job location {}", jobContext.getJobID(), location);
-
-    FileIO io = HiveIcebergStorageHandler.table(jobContext.getJobConf()).io();
-    List<DataFile> dataFiles = dataFiles(jobContext, io, false);
+    LOG.info("Job {} is aborted. Data file cleaning started", jobContext.getJobID());
+    Collection<String> outputs = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf());
+    Collection<String> jobLocations = new ConcurrentLinkedQueue<>();
 
-    // Check if we have files already committed and remove data files if there are any
-    if (dataFiles.size() > 0) {
-      Tasks.foreach(dataFiles)
-          .retry(3)
+    ExecutorService fileExecutor = fileExecutor(jobConf);
+    ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
+    try {
+      // Cleans up the changes for the output tables in parallel
+      Tasks.foreach(outputs)
           .suppressFailureWhenFinished()
-          .onFailure((file, exc) -> LOG.debug("Failed on to remove data file {} on abort job", file.path(), exc))
-          .run(file -> io.deleteFile(file.path().toString()));
+          .executeWith(tableExecutor)
+          .onFailure((output, exc) -> LOG.warn("Failed cleanup table {} on abort job", output, exc))
+          .run(output -> {
+            LOG.info("Cleaning job for table {}", jobContext.getJobID(), output);
+
+            Table table = HiveIcebergStorageHandler.table(jobConf, output);
+            jobLocations.add(generateJobLocation(table.location(), jobConf, jobContext.getJobID()));
+            Collection<DataFile> dataFiles = dataFiles(fileExecutor, table.location(), jobContext, table.io(), false);
+
+            // Check if we have files already committed and remove data files if there are any
+            if (dataFiles.size() > 0) {
+              Tasks.foreach(dataFiles)
+                  .retry(3)
+                  .suppressFailureWhenFinished()
+                  .executeWith(fileExecutor)
+                  .onFailure((file, exc) -> LOG.warn("Failed to remove data file {} on abort job", file.path(), exc))
+                  .run(file -> table.io().deleteFile(file.path().toString()));
+            }
+          });
+    } finally {
+      fileExecutor.shutdown();
+      if (tableExecutor != null) {
+        tableExecutor.shutdown();
+      }
     }
 
-    cleanup(jobContext);
+    LOG.info("Job {} is aborted. Data file cleaning finished", jobContext.getJobID());
+
+    cleanup(jobContext, jobLocations);
+  }
+
+  /**
+   * Collects the additions to a single table and adds/commits the new files to the Iceberg table.
+   * @param io The io to read the forCommit files
+   * @param executor The executor used to read the forCommit files
+   * @param jobContext The job context
+   * @param name The name of the table used for loading from the catalog
+   * @param location The location of the table used for loading from the catalog
+   */
+  private void commitTable(FileIO io, ExecutorService executor, JobContext jobContext, String name, String location) {
+    JobConf conf = jobContext.getJobConf();
+    Properties catalogProperties = new Properties();
+    catalogProperties.put(Catalogs.NAME, name);
+    catalogProperties.put(Catalogs.LOCATION, location);
+    Table table = Catalogs.loadTable(conf, catalogProperties);
+
+    long startTime = System.currentTimeMillis();
+    LOG.info("Committing job has started for table: {}, using location: {}",
+        table, generateJobLocation(location, conf, jobContext.getJobID()));
+
+    Collection<DataFile> dataFiles = dataFiles(executor, location, jobContext, io, true);
+
+    if (dataFiles.size() > 0) {
+      // Appending data files to the table
+      AppendFiles append = table.newAppend();
+      dataFiles.forEach(append::appendFile);
+      append.commit();
+      LOG.info("Commit took {} ms for table: {} with {} file(s)", System.currentTimeMillis() - startTime, table,
+          dataFiles.size());
+      LOG.debug("Added files {}", dataFiles);
+    } else {
+      LOG.info("Commit took {} ms for table: {} with no new files", System.currentTimeMillis() - startTime, table);
+    }
   }
 
   /**
-   * Cleans up the jobs temporary location.
+   * Cleans up the jobs temporary locations. For every target table there is a temp dir to clean up.
    * @param jobContext The job context
+   * @param jobLocations The locations to clean up
    * @throws IOException if there is a failure deleting the files
    */
-  private void cleanup(JobContext jobContext) throws IOException {
-    String location = generateJobLocation(jobContext.getJobConf(), jobContext.getJobID());
-    LOG.info("Cleaning for job: {} on location: {}", jobContext.getJobID(), location);
+  private void cleanup(JobContext jobContext, Collection<String> jobLocations) throws IOException {
+    JobConf jobConf = jobContext.getJobConf();
+
+    LOG.info("Cleaning for job {} started", jobContext.getJobID());
 
-    // Remove the job's temp directory recursively.
-    // Intentionally used foreach on a single item. Using the Tasks API here only for the retry capability.
-    Tasks.foreach(location)
+    // Remove the job's temp directories recursively.
+    Tasks.foreach(jobLocations)
         .retry(3)
         .suppressFailureWhenFinished()
-        .onFailure((file, exc) -> LOG.debug("Failed on to remove directory {} on cleanup job", file, exc))
-        .run(file -> {
-          Path toDelete = new Path(file);
-          FileSystem fs = Util.getFs(toDelete, jobContext.getJobConf());
+        .onFailure((jobLocation, exc) -> LOG.debug("Failed to remove directory {} on job cleanup", jobLocation, exc))
+        .run(jobLocation -> {
+          LOG.info("Cleaning location: {}", jobLocation);
+          Path toDelete = new Path(jobLocation);
+          FileSystem fs = Util.getFs(toDelete, jobConf);
           fs.delete(toDelete, true);
         }, IOException.class);
+
+    LOG.info("Cleaning for job {} finished", jobContext.getJobID());
+  }
+
+  /**
+   * Executor service for parallel handling of file reads. Should be shared when committing multiple tables.
+   * @param conf The configuration containing the pool size
+   * @return The generated executor service
+   */
+  private static ExecutorService fileExecutor(Configuration conf) {
+    int size = conf.getInt(InputFormatConfig.COMMIT_FILE_THREAD_POOL_SIZE,
+        InputFormatConfig.COMMIT_FILE_THREAD_POOL_SIZE_DEFAULT);
+    return Executors.newFixedThreadPool(
+        size,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setPriority(Thread.NORM_PRIORITY)
+            .setNameFormat("iceberg-commit-file-pool-%d")
+            .build());
   }
 
   /**
-   * Get the data committed data files for this job.
+   * Executor service for parallel handling of table manipulation. Could return null, if no parallelism is possible.
+   * @param conf The configuration containing the pool size
+   * @param maxThreadNum The number of requests we want to handle (might be decreased further by configuration)
+   * @return The generated executor service, or null if executor is not needed.
+   */
+  private static ExecutorService tableExecutor(Configuration conf, int maxThreadNum) {
+    int size = conf.getInt(InputFormatConfig.COMMIT_TABLE_THREAD_POOL_SIZE,
+        InputFormatConfig.COMMIT_TABLE_THREAD_POOL_SIZE_DEFAULT);
+    size = Math.min(maxThreadNum, size);
+    if (size > 1) {
+      return Executors.newFixedThreadPool(
+          size,
+          new ThreadFactoryBuilder()
+              .setDaemon(true)
+              .setPriority(Thread.NORM_PRIORITY)
+              .setNameFormat("iceberg-commit-table-pool-%d")
+              .build());
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Get the committed data files for this table and job.
+   * @param executor The executor used for reading the forCommit files parallel
+   * @param location The location of the table
    * @param jobContext The job context
    * @param io The FileIO used for reading a files generated for commit
    * @param throwOnFailure If <code>true</code> then it throws an exception on failure
    * @return The list of the committed data files
    */
-  private static List<DataFile> dataFiles(JobContext jobContext, FileIO io, boolean throwOnFailure) {
+  private static Collection<DataFile> dataFiles(ExecutorService executor, String location, JobContext jobContext,
+      FileIO io, boolean throwOnFailure) {
     JobConf conf = jobContext.getJobConf();
     // If there are reducers, then every reducer will generate a result file.
     // If this is a map only task, then every mapper will generate a result file.
     int expectedFiles = conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks();
 
-    ExecutorService executor = null;
-    try {
-      // Creating executor service for parallel handling of file reads
-      executor = Executors.newFixedThreadPool(
-          conf.getInt(InputFormatConfig.COMMIT_THREAD_POOL_SIZE, InputFormatConfig.COMMIT_THREAD_POOL_SIZE_DEFAULT),
-          new ThreadFactoryBuilder()
-              .setDaemon(true)
-              .setPriority(Thread.NORM_PRIORITY)
-              .setNameFormat("iceberg-commit-pool-%d")
-              .build());
-
-      List<DataFile> dataFiles = Collections.synchronizedList(new ArrayList<>());
+    Collection<DataFile> dataFiles = new ConcurrentLinkedQueue<>();
 
-      // Reading the committed files. The assumption here is that the taskIds are generated in sequential order
-      // starting from 0.
-      Tasks.range(expectedFiles)
-          .throwFailureWhenFinished(throwOnFailure)
-          .executeWith(executor)
-          .retry(3)
-          .run(taskId -> {
-            String taskFileName = generateFileForCommitLocation(conf, jobContext.getJobID(), taskId);
-            dataFiles.addAll(Arrays.asList(readFileForCommit(taskFileName, io)));
-          });
+    // Reading the committed files. The assumption here is that the taskIds are generated in sequential order
+    // starting from 0.
+    Tasks.range(expectedFiles)
+        .throwFailureWhenFinished(throwOnFailure)
+        .executeWith(executor)
+        .retry(3)
+        .run(taskId -> {
+          String taskFileName = generateFileForCommitLocation(location, conf, jobContext.getJobID(), taskId);
+          dataFiles.addAll(Arrays.asList(readFileForCommit(taskFileName, io)));
+        });
 
-      return dataFiles;
-    } finally {
-      if (executor != null) {
-        executor.shutdown();
-      }
-    }
+    return dataFiles;
   }
 
   /**
    * Generates the job temp location based on the job configuration.
-   * Currently it uses QUERY_LOCATION-jobId.
+   * Currently it uses TABLE_LOCATION/temp/QUERY_ID-jobId.
+   * @param location The location of the table
    * @param conf The job's configuration
    * @param jobId The JobID for the task
    * @return The file to store the results
    */
   @VisibleForTesting
-  static String generateJobLocation(Configuration conf, JobID jobId) {
-    String tableLocation = conf.get(InputFormatConfig.TABLE_LOCATION);
+  static String generateJobLocation(String location, Configuration conf, JobID jobId) {
     String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname);
-    return tableLocation + "/temp/" + queryId + "-" + jobId;
+    return location + "/temp/" + queryId + "-" + jobId;
   }
 
   /**
    * Generates file location based on the task configuration and a specific task id.
    * This file will be used to store the data required to generate the Iceberg commit.
-   * Currently it uses QUERY_LOCATION-jobId/task-[0..numTasks).forCommit.
+   * Currently it uses TABLE_LOCATION/temp/QUERY_ID-jobId/task-[0..numTasks).forCommit.
+   * @param location The location of the table
    * @param conf The job's configuration
    * @param jobId The jobId for the task
    * @param taskId The taskId for the commit file
    * @return The file to store the results
    */
-  private static String generateFileForCommitLocation(Configuration conf, JobID jobId, int taskId) {
-    return generateJobLocation(conf, jobId) + "/task-" + taskId + FOR_COMMIT_EXTENSION;
+  private static String generateFileForCommitLocation(String location, Configuration conf, JobID jobId, int taskId) {
+    return generateJobLocation(location, conf, jobId) + "/task-" + taskId + FOR_COMMIT_EXTENSION;
   }
 
   private static void createFileForCommit(DataFile[] closedFiles, String location, FileIO io)
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index 9a2ae60..080c015 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -23,6 +23,7 @@ import java.util.Properties;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.io.NullWritable;
@@ -41,6 +42,7 @@ import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.mapred.Container;
 import org.apache.iceberg.util.PropertyUtil;
 
@@ -66,7 +68,8 @@ public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Co
 
   private static HiveIcebergRecordWriter writer(JobConf jc) {
     TaskAttemptID taskAttemptID = TezUtil.taskAttemptWrapper(jc);
-    Table table = HiveIcebergStorageHandler.table(jc);
+    // It gets the config from the FileSinkOperator which has its own config for every target table
+    Table table = HiveIcebergStorageHandler.table(jc, jc.get(hive_metastoreConstants.META_TABLE_NAME));
     Schema schema = HiveIcebergStorageHandler.schema(jc);
     PartitionSpec spec = table.spec();
     FileFormat fileFormat = FileFormat.valueOf(PropertyUtil.propertyAsString(table.properties(),
@@ -79,8 +82,9 @@ public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Co
     OutputFileFactory outputFileFactory =
         new OutputFileFactory(spec, fileFormat, location, io, encryption, taskAttemptID.getTaskID().getId(),
             taskAttemptID.getId(), jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + taskAttemptID.getJobID());
+    String tableName = jc.get(Catalogs.NAME);
     HiveIcebergRecordWriter writer = new HiveIcebergRecordWriter(schema, spec, fileFormat,
-        new GenericAppenderFactory(schema, spec), outputFileFactory, io, targetFileSize, taskAttemptID);
+        new GenericAppenderFactory(schema, spec), outputFileFactory, io, targetFileSize, taskAttemptID, tableName);
 
     return writer;
   }
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
index 8622d1a..2adf43d 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
@@ -21,7 +21,6 @@ package org.apache.iceberg.mr.hive;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
@@ -38,6 +37,7 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.PartitionedFanoutWriter;
 import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.Tasks;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,25 +50,26 @@ class HiveIcebergRecordWriter extends PartitionedFanoutWriter<Record>
   private final PartitionKey currentKey;
   private final FileIO io;
 
-  // <TaskAttemptId, HiveIcebergRecordWriter> map to store the active writers
+  // <TaskAttemptId, <TABLE_NAME, HiveIcebergRecordWriter>> map to store the active writers
   // Stored in concurrent map, since some executor engines can share containers
-  private static final Map<TaskAttemptID, HiveIcebergRecordWriter> writers = new ConcurrentHashMap<>();
+  private static final Map<TaskAttemptID, Map<String, HiveIcebergRecordWriter>> writers = Maps.newConcurrentMap();
 
-  static HiveIcebergRecordWriter removeWriter(TaskAttemptID taskAttemptID) {
+  static Map<String, HiveIcebergRecordWriter> removeWriters(TaskAttemptID taskAttemptID) {
     return writers.remove(taskAttemptID);
   }
 
-  static HiveIcebergRecordWriter getWriter(TaskAttemptID taskAttemptID) {
+  static Map<String, HiveIcebergRecordWriter> getWriters(TaskAttemptID taskAttemptID) {
     return writers.get(taskAttemptID);
   }
 
   HiveIcebergRecordWriter(Schema schema, PartitionSpec spec, FileFormat format,
       FileAppenderFactory<Record> appenderFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
-      TaskAttemptID taskAttemptID) {
+      TaskAttemptID taskAttemptID, String tableName) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
     this.io = io;
     this.currentKey = new PartitionKey(spec, schema);
-    writers.put(taskAttemptID, this);
+    writers.putIfAbsent(taskAttemptID, Maps.newConcurrentMap());
+    writers.get(taskAttemptID).put(tableName, this);
   }
 
   @Override
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 42a4982..015bcb0 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.mr.hive;
 
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Properties;
 import org.apache.hadoop.conf.Configuration;
@@ -41,10 +42,14 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.SerializationUtil;
 
 public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, HiveStorageHandler {
+  private static final Splitter TABLE_NAME_SPLITTER = Splitter.on("..");
+  private static final String TABLE_NAME_SEPARATOR = "..";
 
   static final String WRITE_KEY = "HiveIcebergStorageHandler_write";
 
@@ -108,7 +113,13 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
   public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
     if (tableDesc != null && tableDesc.getProperties() != null &&
         tableDesc.getProperties().get(WRITE_KEY) != null) {
+      Preconditions.checkArgument(!tableDesc.getTableName().contains(TABLE_NAME_SEPARATOR),
+          "Can not handle table " + tableDesc.getTableName() + ". Its name contains '" + TABLE_NAME_SEPARATOR + "'");
+      String tables = jobConf.get(InputFormatConfig.OUTPUT_TABLES);
+      tables = tables == null ? tableDesc.getTableName() : tables + TABLE_NAME_SEPARATOR + tableDesc.getTableName();
+
       jobConf.set("mapred.output.committer.class", HiveIcebergOutputCommitter.class.getName());
+      jobConf.set(InputFormatConfig.OUTPUT_TABLES, tables);
     }
   }
 
@@ -142,12 +153,22 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
   }
 
   /**
-   * Returns the Table serialized to the configuration.
+   * Returns the Table serialized to the configuration based on the table name.
    * @param config The configuration used to get the data from
+   * @param name The name of the table we need as returned by TableDesc.getTableName()
    * @return The Table
    */
-  public static Table table(Configuration config) {
-    return SerializationUtil.deserializeFromBase64(config.get(InputFormatConfig.SERIALIZED_TABLE));
+  public static Table table(Configuration config, String name) {
+    return SerializationUtil.deserializeFromBase64(config.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX + name));
+  }
+
+  /**
+   * Returns the names of the output tables stored in the configuration.
+   * @param config The configuration used to get the data from
+   * @return The collection of the table names as returned by TableDesc.getTableName()
+   */
+  public static Collection<String> outputTables(Configuration config) {
+    return TABLE_NAME_SPLITTER.splitToList(config.get(InputFormatConfig.OUTPUT_TABLES));
   }
 
   /**
@@ -190,7 +211,8 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
     map.put(InputFormatConfig.TABLE_SCHEMA, schemaJson);
 
     if (table instanceof Serializable) {
-      map.put(InputFormatConfig.SERIALIZED_TABLE, SerializationUtil.serializeToBase64(table));
+      map.put(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableDesc.getTableName(),
+          SerializationUtil.serializeToBase64(table));
     }
 
     // We need to remove this otherwise the job.xml will be invalid as column comments are separated with '\0' and
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index a67c513..0a43189 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.BiFunction;
 import org.apache.hadoop.conf.Configuration;
@@ -64,6 +65,7 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -93,12 +95,9 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
   @Override
   public List<InputSplit> getSplits(JobContext context) {
     Configuration conf = context.getConfiguration();
-    Table table;
-    if (conf.get(InputFormatConfig.SERIALIZED_TABLE) != null) {
-      table = SerializationUtil.deserializeFromBase64(conf.get(InputFormatConfig.SERIALIZED_TABLE));
-    } else {
-      table = Catalogs.loadTable(conf);
-    }
+    Table table = Optional
+        .ofNullable(HiveIcebergStorageHandler.table(conf, conf.get(InputFormatConfig.TABLE_IDENTIFIER)))
+        .orElseGet(() -> Catalogs.loadTable(conf));
 
     TableScan scan = table.newScan()
             .caseSensitive(conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT));
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
index cc1f2cb..b751af8 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
@@ -263,6 +263,7 @@ public class HiveIcebergTestUtils {
         .collect(Collectors.toList());
 
     Assert.assertEquals(dataFileNum, dataFiles.size());
-    Assert.assertFalse(new File(HiveIcebergOutputCommitter.generateJobLocation(conf, jobId)).exists());
+    Assert.assertFalse(
+        new File(HiveIcebergOutputCommitter.generateJobLocation(table.location(), conf, jobId)).exists());
   }
 }
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
index 0e132df..1f5466b 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
@@ -47,10 +47,12 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.mr.TestHelper;
 import org.apache.iceberg.mr.mapred.Container;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SerializationUtil;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -58,7 +60,7 @@ import org.junit.rules.TemporaryFolder;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
-import static org.apache.iceberg.mr.hive.HiveIcebergRecordWriter.getWriter;
+import static org.apache.iceberg.mr.hive.HiveIcebergRecordWriter.getWriters;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
 public class TestHiveIcebergOutputCommitter {
@@ -107,7 +109,7 @@ public class TestHiveIcebergOutputCommitter {
     HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
     Table table = table(temp.getRoot().getPath(), false);
     JobConf conf = jobConf(table, 1);
-    List<Record> expected = writeRecords(1, 0, true, false, conf);
+    List<Record> expected = writeRecords(table.name(), 1, 0, true, false, conf);
     committer.commitJob(new JobContextImpl(conf, JOB_ID));
 
     HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 1);
@@ -119,7 +121,7 @@ public class TestHiveIcebergOutputCommitter {
     HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
     Table table = table(temp.getRoot().getPath(), true);
     JobConf conf = jobConf(table, 1);
-    List<Record> expected = writeRecords(1, 0, true, false, conf);
+    List<Record> expected = writeRecords(table.name(), 1, 0, true, false, conf);
     committer.commitJob(new JobContextImpl(conf, JOB_ID));
 
     HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 3);
@@ -131,7 +133,7 @@ public class TestHiveIcebergOutputCommitter {
     HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
     Table table = table(temp.getRoot().getPath(), false);
     JobConf conf = jobConf(table, 2);
-    List<Record> expected = writeRecords(2, 0, true, false, conf);
+    List<Record> expected = writeRecords(table.name(), 2, 0, true, false, conf);
     committer.commitJob(new JobContextImpl(conf, JOB_ID));
 
     HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 2);
@@ -143,7 +145,7 @@ public class TestHiveIcebergOutputCommitter {
     HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
     Table table = table(temp.getRoot().getPath(), true);
     JobConf conf = jobConf(table, 2);
-    List<Record> expected = writeRecords(2, 0, true, false, conf);
+    List<Record> expected = writeRecords(table.name(), 2, 0, true, false, conf);
     committer.commitJob(new JobContextImpl(conf, JOB_ID));
 
     HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 6);
@@ -157,18 +159,18 @@ public class TestHiveIcebergOutputCommitter {
     JobConf conf = jobConf(table, 2);
 
     // Write records and abort the tasks
-    writeRecords(2, 0, false, true, conf);
+    writeRecords(table.name(), 2, 0, false, true, conf);
     HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 0);
     HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0);
 
     // Write records but do not abort the tasks
     // The data files remain since we can not identify them but should not be read
-    writeRecords(2, 1, false, false, conf);
+    writeRecords(table.name(), 2, 1, false, false, conf);
     HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 2);
     HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0);
 
     // Write and commit the records
-    List<Record> expected = writeRecords(2, 2, true, false, conf);
+    List<Record> expected = writeRecords(table.name(), 2, 2, true, false, conf);
     committer.commitJob(new JobContextImpl(conf, JOB_ID));
     HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 4);
     HiveIcebergTestUtils.validateData(table, expected, 0);
@@ -179,7 +181,7 @@ public class TestHiveIcebergOutputCommitter {
     HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
     Table table = table(temp.getRoot().getPath(), false);
     JobConf conf = jobConf(table, 1);
-    writeRecords(1, 0, true, false, conf);
+    writeRecords(table.name(), 1, 0, true, false, conf);
     committer.abortJob(new JobContextImpl(conf, JOB_ID), JobStatus.State.FAILED);
 
     HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 0);
@@ -198,7 +200,7 @@ public class TestHiveIcebergOutputCommitter {
     Table table = table(temp.getRoot().getPath(), false);
     JobConf conf = jobConf(table, 1);
     try {
-      writeRecords(1, 0, true, false, conf, failingCommitter);
+      writeRecords(table.name(), 1, 0, true, false, conf, failingCommitter);
       Assert.fail();
     } catch (RuntimeException e) {
       Assert.assertTrue(e.getMessage().contains(exceptionMessage));
@@ -207,10 +209,10 @@ public class TestHiveIcebergOutputCommitter {
     Assert.assertEquals(1, argumentCaptor.getAllValues().size());
     TaskAttemptID capturedId = TezUtil.taskAttemptWrapper(argumentCaptor.getValue().getTaskAttemptID());
     // writer is still in the map after commitTask failure
-    Assert.assertNotNull(getWriter(capturedId));
+    Assert.assertNotNull(getWriters(capturedId));
     failingCommitter.abortTask(new TaskAttemptContextImpl(conf, capturedId));
     // abortTask succeeds and removes writer
-    Assert.assertNull(getWriter(capturedId));
+    Assert.assertNull(getWriters(capturedId));
   }
 
   private Table table(String location, boolean partitioned) {
@@ -223,6 +225,8 @@ public class TestHiveIcebergOutputCommitter {
     conf.setNumMapTasks(taskNum);
     conf.setNumReduceTasks(0);
     conf.set(HiveConf.ConfVars.HIVEQUERYID.varname, QUERY_ID);
+    conf.set(InputFormatConfig.OUTPUT_TABLES, table.name());
+    conf.set(InputFormatConfig.SERIALIZED_TABLE_PREFIX + table.name(), SerializationUtil.serializeToBase64(table));
 
     Map<String, String> propMap = Maps.newHashMap();
     TableDesc tableDesc = new TableDesc();
@@ -237,6 +241,7 @@ public class TestHiveIcebergOutputCommitter {
   /**
    * Write random records to the given table using separate {@link HiveIcebergOutputCommitter} and
    * a separate {@link HiveIcebergRecordWriter} for every task.
+   * @param name The name of the table to get the table object from the conf
    * @param taskNum The number of tasks in the job handled by the committer
    * @param attemptNum The id used for attempt number generation
    * @param commitTasks If <code>true</code> the tasks will be committed
@@ -247,11 +252,11 @@ public class TestHiveIcebergOutputCommitter {
    * @return The random generated records which were appended to the table
    * @throws IOException Propagating {@link HiveIcebergRecordWriter} exceptions
    */
-  private List<Record> writeRecords(int taskNum, int attemptNum, boolean commitTasks, boolean abortTasks,
+  private List<Record> writeRecords(String name, int taskNum, int attemptNum, boolean commitTasks, boolean abortTasks,
                                     JobConf conf, OutputCommitter committer) throws IOException {
     List<Record> expected = new ArrayList<>(RECORD_NUM * taskNum);
 
-    Table table = HiveIcebergStorageHandler.table(conf);
+    Table table = HiveIcebergStorageHandler.table(conf, name);
     FileIO io = table.io();
     LocationProvider location = table.locationProvider();
     EncryptionManager encryption = table.encryption();
@@ -266,7 +271,7 @@ public class TestHiveIcebergOutputCommitter {
               attemptNum, QUERY_ID + "-" + JOB_ID);
       HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema, spec, FileFormat.PARQUET,
           new GenericAppenderFactory(schema), outputFileFactory, io, TARGET_FILE_SIZE,
-          TezUtil.taskAttemptWrapper(taskId));
+          TezUtil.taskAttemptWrapper(taskId), conf.get(Catalogs.NAME));
 
       Container<Record> container = new Container<>();
 
@@ -287,8 +292,8 @@ public class TestHiveIcebergOutputCommitter {
     return expected;
   }
 
-  private List<Record> writeRecords(int taskNum, int attemptNum, boolean commitTasks, boolean abortTasks,
+  private List<Record> writeRecords(String name, int taskNum, int attemptNum, boolean commitTasks, boolean abortTasks,
                                     JobConf conf) throws IOException {
-    return writeRecords(taskNum, attemptNum, commitTasks, abortTasks, conf, new HiveIcebergOutputCommitter());
+    return writeRecords(name, taskNum, attemptNum, commitTasks, abortTasks, conf, new HiveIcebergOutputCommitter());
   }
 }
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index fcb4311..41aef8b 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -621,6 +621,47 @@ public class TestHiveIcebergStorageHandlerWithEngine {
     HiveIcebergTestUtils.validateData(table, records, 0);
   }
 
+  @Test
+  public void testMultiTableInsert() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    Schema target1Schema = new Schema(
+        optional(1, "customer_id", Types.LongType.get()),
+        optional(2, "first_name", Types.StringType.get())
+    );
+
+    Schema target2Schema = new Schema(
+        optional(1, "last_name", Types.StringType.get()),
+        optional(2, "customer_id", Types.LongType.get())
+    );
+
+    List<Record> target1Records = TestHelper.RecordsBuilder.newInstance(target1Schema)
+        .add(0L, "Alice")
+        .add(1L, "Bob")
+        .add(2L, "Trudy")
+        .build();
+
+    List<Record> target2Records = TestHelper.RecordsBuilder.newInstance(target2Schema)
+        .add("Brown", 0L)
+        .add("Green", 1L)
+        .add("Pink", 2L)
+        .build();
+
+    Table target1 = testTables.createTable(shell, "target1", target1Schema, fileFormat, ImmutableList.of());
+    Table target2 = testTables.createTable(shell, "target2", target2Schema, fileFormat, ImmutableList.of());
+
+    shell.executeStatement("FROM customers " +
+            "INSERT INTO target1 SELECT customer_id, first_name " +
+            "INSERT INTO target2 SELECT last_name, customer_id");
+
+    // Check that everything is as expected
+    HiveIcebergTestUtils.validateData(target1, target1Records, 0);
+    HiveIcebergTestUtils.validateData(target2, target2Records, 1);
+  }
+
   private void testComplexTypeWrite(Schema schema, List<Record> records) throws IOException {
     String tableName = "complex_table";
     Table table = testTables.createTable(shell, "complex_table", schema, fileFormat, ImmutableList.of());