You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/12/03 16:32:16 UTC

[GitHub] [iceberg] marton-bod commented on a change in pull request #1861: Hive: OutputCommitter implementation for Hive writes

marton-bod commented on a change in pull request #1861:
URL: https://github.com/apache/iceberg/pull/1861#discussion_r535310601



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/ClosedFileData.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.Serializable;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+/**
+ * Class for storing the data file properties which are needed for an Icebreg commit.

Review comment:
       nit: typo in word Iceberg

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/ClosedFileData.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.Serializable;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+/**
+ * Class for storing the data file properties which are needed for an Icebreg commit.
+ * <ul>
+ *   <li>Partition key
+ *   <li>File name
+ *   <li>File format
+ *   <li>File size
+ *   <li>Metrics
+ * </ul>
+ */
+final class ClosedFileData implements Serializable {
+  private final PartitionKey partitionKey;
+  private final String fileName;
+  private final FileFormat fileFormat;
+  private final Long length;
+  private final Metrics metrics;
+
+  ClosedFileData(PartitionKey partitionKey, String fileName, FileFormat fileFormat, Long length, Metrics metrics) {
+    this.partitionKey = partitionKey;
+    this.fileName = fileName;
+    this.fileFormat = fileFormat;
+    this.length = length;
+    this.metrics = metrics;
+  }
+
+  PartitionKey partitionKey() {
+    return partitionKey;
+  }
+
+  String fileName() {
+    return fileName;
+  }
+
+  FileFormat fileFormat() {
+    return fileFormat;
+  }
+
+  Long length() {

Review comment:
       can we rename this to `fileSize()` to align with the javadoc and the above two methods? Also maybe a short javadoc comment on what unit we use here (I'm assuming bytes)

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An Iceberg table committer for adding data files to the Iceberg tables.
+ * Currently independent of the Hive ACID transactions.
+ */
+public final class HiveIcebergOutputCommitter extends OutputCommitter {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class);
+
+  @Override
+  public void setupJob(JobContext jobContext) {
+    // do nothing.
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext taskAttemptContext) {
+    // do nothing.
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext context) {
+    // We need to commit if this is the last phase of a MapReduce process
+    return TaskType.REDUCE.equals(context.getTaskAttemptID().getTaskID().getTaskType()) ||
+        context.getJobConf().getNumReduceTasks() == 0;
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext context) throws IOException {
+    TaskAttemptID attemptID = context.getTaskAttemptID();
+    String commitFileLocation = LocationHelper.generateToCommitFileLocation(context.getJobConf(), attemptID);
+    HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(attemptID);
+
+    Set<ClosedFileData> closedFiles = Collections.emptySet();
+    if (writer != null) {
+      closedFiles = writer.closedFileData();
+    }
+
+    // Create the committed file for the task
+    createToCommitFile(closedFiles, commitFileLocation, new HadoopFileIO(context.getJobConf()));
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext context) throws IOException {
+    // Clean up writer data from the local store
+    HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(context.getTaskAttemptID());
+
+    // Remove files if it was not done already
+    writer.close(true);
+  }
+
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    JobConf conf = jobContext.getJobConf();
+    // If there are reducers, then every reducer will generate a result file.
+    // If this is a map only task, then every mapper will generate a result file.
+    int expectedFiles = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks();
+    Table table = Catalogs.loadTable(conf);
+
+    long startTime = System.currentTimeMillis();
+    LOG.info("Committing job is started for {} using {} expecting {} file(s)", table,
+        LocationHelper.generateJobLocation(conf, jobContext.getJobID()), expectedFiles);
+
+    ExecutorService executor = null;
+    try {
+      // Creating executor service for parallel handling of file reads
+      executor = Executors.newFixedThreadPool(
+          conf.getInt(InputFormatConfig.COMMIT_THREAD_POOL_SIZE, InputFormatConfig.COMMIT_THREAD_POOL_SIZE_DEFAULT),
+          new ThreadFactoryBuilder()
+              .setDaemon(true)
+              .setPriority(Thread.NORM_PRIORITY)
+              .setNameFormat("iceberg-commit-pool-%d")
+              .build());
+
+      Set<DataFile> dataFiles = new ConcurrentHashMap<>().newKeySet();
+
+      // Reading the committed files. The assumption here is that the taskIds are generated in sequential order
+      // starting from 0.
+      Tasks.range(expectedFiles)
+          .executeWith(executor)
+          .retry(3)
+          .run(taskId -> {
+            String taskFileName = LocationHelper.generateToCommitFileLocation(conf, jobContext.getJobID(), taskId);
+            Set<ClosedFileData> closedFiles = readToCommitFile(taskFileName, table.io());
+
+            // If the data is not empty add to the table
+            if (!closedFiles.isEmpty()) {
+              closedFiles.forEach(file -> {
+                DataFiles.Builder builder = DataFiles.builder(table.spec())
+                    .withPath(file.fileName())
+                    .withFormat(file.fileFormat())
+                    .withFileSizeInBytes(file.length())
+                    .withPartition(file.partitionKey())
+                    .withMetrics(file.metrics());
+                dataFiles.add(builder.build());
+              });
+            }
+          });
+
+      if (dataFiles.size() > 0) {
+        // Appending data files to the table
+        AppendFiles append = table.newAppend();
+        Set<String> addedFiles = new HashSet<>(dataFiles.size());
+        dataFiles.forEach(dataFile -> {
+          append.appendFile(dataFile);
+          addedFiles.add(dataFile.path().toString());
+        });
+        append.commit();
+        LOG.info("Commit for Iceberg write taken {} ms for {} with file(s) {}",
+            System.currentTimeMillis() - startTime, table, addedFiles);
+      } else {
+        LOG.info("Commit for Iceberg write taken {} ms for {} with no new files",
+            System.currentTimeMillis() - startTime, table);
+      }
+
+      // Calling super to cleanupJob if something more is needed
+      cleanupJob(jobContext);
+
+    } finally {
+      if (executor != null) {
+        executor.shutdown();
+      }
+    }
+  }
+
+  @Override
+  public void abortJob(JobContext jobContext, int status) throws IOException {
+    String location = LocationHelper.generateJobLocation(jobContext.getJobConf(), jobContext.getJobID());
+    LOG.info("Job {} is aborted. Cleaning job location {}", jobContext.getJobID(), location);
+
+    // Remove the result directory for the failed job
+    Tasks.foreach(location)

Review comment:
       just to clarify: `Tasks` is only used here for the retry feature? location seems to be a single string, so I guess there'd be no parallel execution here

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class HiveIcebergRecordWriter extends org.apache.hadoop.mapreduce.RecordWriter<NullWritable, Container>
+    implements FileSinkOperator.RecordWriter, org.apache.hadoop.mapred.RecordWriter<NullWritable, Container> {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergRecordWriter.class);
+
+  // <TaskAttemptId, HiveIcebergRecordWriter> map to store the active writers
+  // Stored in concurrent map, since some executor engines can share containers
+  private static Map<TaskAttemptID, HiveIcebergRecordWriter> writers = new ConcurrentHashMap<>();
+
+  private final FileIO io;
+  private final String location;
+  private final FileFormat fileFormat;
+  private final GenericAppenderFactory appenderFactory;
+  // The current key is reused at every write to avoid unnecessary object creation
+  private final PartitionKey currentKey;
+  // Data for every partition is written to a different appender
+  // This map stores the open appenders for the given partition key
+  private final Map<PartitionKey, AppenderWrapper> openAppenders = new HashMap<>();
+  // When the appenders are closed the file data needed for the Iceberg commit is stored and accessible through
+  // this map
+  private final Map<PartitionKey, ClosedFileData> closedFileData = new HashMap<>();
+
+  HiveIcebergRecordWriter(TaskAttemptID taskAttemptID, Configuration conf, String location, FileFormat fileFormat,
+                          Schema schema, PartitionSpec spec) {
+    this.io = new HadoopFileIO(conf);
+    this.location = location;
+    this.fileFormat = fileFormat;
+    this.appenderFactory = new GenericAppenderFactory(schema);
+    this.currentKey = new PartitionKey(spec, schema);
+    writers.put(taskAttemptID, this);
+    LOG.info("IcebergRecordWriter is created in {} with {}", location, fileFormat);
+  }
+
+  @Override
+  public void write(Writable row) {
+    Preconditions.checkArgument(row instanceof Container);

Review comment:
       can you add a short comment on why this needs to be a Container instance?

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An Iceberg table committer for adding data files to the Iceberg tables.
+ * Currently independent of the Hive ACID transactions.
+ */
+public final class HiveIcebergOutputCommitter extends OutputCommitter {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class);
+
+  @Override
+  public void setupJob(JobContext jobContext) {
+    // do nothing.
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext taskAttemptContext) {
+    // do nothing.
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext context) {
+    // We need to commit if this is the last phase of a MapReduce process
+    return TaskType.REDUCE.equals(context.getTaskAttemptID().getTaskID().getTaskType()) ||
+        context.getJobConf().getNumReduceTasks() == 0;
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext context) throws IOException {
+    TaskAttemptID attemptID = context.getTaskAttemptID();
+    String commitFileLocation = LocationHelper.generateToCommitFileLocation(context.getJobConf(), attemptID);
+    HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(attemptID);
+
+    Set<ClosedFileData> closedFiles = Collections.emptySet();
+    if (writer != null) {
+      closedFiles = writer.closedFileData();
+    }
+
+    // Create the committed file for the task
+    createToCommitFile(closedFiles, commitFileLocation, new HadoopFileIO(context.getJobConf()));
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext context) throws IOException {
+    // Clean up writer data from the local store
+    HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(context.getTaskAttemptID());
+
+    // Remove files if it was not done already
+    writer.close(true);
+  }
+
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    JobConf conf = jobContext.getJobConf();
+    // If there are reducers, then every reducer will generate a result file.
+    // If this is a map only task, then every mapper will generate a result file.
+    int expectedFiles = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks();
+    Table table = Catalogs.loadTable(conf);
+
+    long startTime = System.currentTimeMillis();
+    LOG.info("Committing job is started for {} using {} expecting {} file(s)", table,
+        LocationHelper.generateJobLocation(conf, jobContext.getJobID()), expectedFiles);
+
+    ExecutorService executor = null;
+    try {
+      // Creating executor service for parallel handling of file reads
+      executor = Executors.newFixedThreadPool(
+          conf.getInt(InputFormatConfig.COMMIT_THREAD_POOL_SIZE, InputFormatConfig.COMMIT_THREAD_POOL_SIZE_DEFAULT),
+          new ThreadFactoryBuilder()
+              .setDaemon(true)
+              .setPriority(Thread.NORM_PRIORITY)
+              .setNameFormat("iceberg-commit-pool-%d")
+              .build());
+
+      Set<DataFile> dataFiles = new ConcurrentHashMap<>().newKeySet();
+
+      // Reading the committed files. The assumption here is that the taskIds are generated in sequential order
+      // starting from 0.
+      Tasks.range(expectedFiles)
+          .executeWith(executor)
+          .retry(3)
+          .run(taskId -> {
+            String taskFileName = LocationHelper.generateToCommitFileLocation(conf, jobContext.getJobID(), taskId);
+            Set<ClosedFileData> closedFiles = readToCommitFile(taskFileName, table.io());
+
+            // If the data is not empty add to the table
+            if (!closedFiles.isEmpty()) {
+              closedFiles.forEach(file -> {
+                DataFiles.Builder builder = DataFiles.builder(table.spec())
+                    .withPath(file.fileName())
+                    .withFormat(file.fileFormat())
+                    .withFileSizeInBytes(file.length())
+                    .withPartition(file.partitionKey())
+                    .withMetrics(file.metrics());
+                dataFiles.add(builder.build());
+              });
+            }
+          });
+
+      if (dataFiles.size() > 0) {
+        // Appending data files to the table
+        AppendFiles append = table.newAppend();
+        Set<String> addedFiles = new HashSet<>(dataFiles.size());
+        dataFiles.forEach(dataFile -> {
+          append.appendFile(dataFile);
+          addedFiles.add(dataFile.path().toString());
+        });
+        append.commit();
+        LOG.info("Commit for Iceberg write taken {} ms for {} with file(s) {}",
+            System.currentTimeMillis() - startTime, table, addedFiles);
+      } else {
+        LOG.info("Commit for Iceberg write taken {} ms for {} with no new files",
+            System.currentTimeMillis() - startTime, table);
+      }
+
+      // Calling super to cleanupJob if something more is needed
+      cleanupJob(jobContext);
+
+    } finally {
+      if (executor != null) {
+        executor.shutdown();
+      }
+    }
+  }
+
+  @Override
+  public void abortJob(JobContext jobContext, int status) throws IOException {
+    String location = LocationHelper.generateJobLocation(jobContext.getJobConf(), jobContext.getJobID());
+    LOG.info("Job {} is aborted. Cleaning job location {}", jobContext.getJobID(), location);
+
+    // Remove the result directory for the failed job
+    Tasks.foreach(location)
+        .retry(3)
+        .suppressFailureWhenFinished()
+        .onFailure((file, exc) -> LOG.debug("Failed on to remove directory {} on abort job", file, exc))
+        .run(file -> {
+          Path toDelete = new Path(file);
+          FileSystem fs = Util.getFs(toDelete, jobContext.getJobConf());
+          try {
+            fs.delete(toDelete, true /* recursive */);
+          } catch (IOException e) {
+            throw new RuntimeIOException(e, "Failed to delete job directory: %s", file);
+          }
+        });
+    cleanupJob(jobContext);
+  }
+
+  private static void createToCommitFile(Set<ClosedFileData> closedFiles, String location, FileIO io)
+      throws IOException {
+
+    OutputFile commitFile = io.newOutputFile(location);
+    ObjectOutputStream oos = new ObjectOutputStream(commitFile.createOrOverwrite());

Review comment:
       can we use try-with-resources here to make sure the stream is closed?

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An Iceberg table committer for adding data files to the Iceberg tables.
+ * Currently independent of the Hive ACID transactions.
+ */
+public final class HiveIcebergOutputCommitter extends OutputCommitter {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class);
+
+  @Override
+  public void setupJob(JobContext jobContext) {
+    // do nothing.
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext taskAttemptContext) {
+    // do nothing.
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext context) {
+    // We need to commit if this is the last phase of a MapReduce process
+    return TaskType.REDUCE.equals(context.getTaskAttemptID().getTaskID().getTaskType()) ||
+        context.getJobConf().getNumReduceTasks() == 0;
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext context) throws IOException {
+    TaskAttemptID attemptID = context.getTaskAttemptID();
+    String commitFileLocation = LocationHelper.generateToCommitFileLocation(context.getJobConf(), attemptID);
+    HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(attemptID);
+
+    Set<ClosedFileData> closedFiles = Collections.emptySet();
+    if (writer != null) {
+      closedFiles = writer.closedFileData();
+    }
+
+    // Create the committed file for the task
+    createToCommitFile(closedFiles, commitFileLocation, new HadoopFileIO(context.getJobConf()));
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext context) throws IOException {
+    // Clean up writer data from the local store
+    HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(context.getTaskAttemptID());
+
+    // Remove files if it was not done already
+    writer.close(true);
+  }
+
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    JobConf conf = jobContext.getJobConf();
+    // If there are reducers, then every reducer will generate a result file.
+    // If this is a map only task, then every mapper will generate a result file.
+    int expectedFiles = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks();
+    Table table = Catalogs.loadTable(conf);
+
+    long startTime = System.currentTimeMillis();
+    LOG.info("Committing job is started for {} using {} expecting {} file(s)", table,
+        LocationHelper.generateJobLocation(conf, jobContext.getJobID()), expectedFiles);
+
+    ExecutorService executor = null;
+    try {
+      // Creating executor service for parallel handling of file reads
+      executor = Executors.newFixedThreadPool(
+          conf.getInt(InputFormatConfig.COMMIT_THREAD_POOL_SIZE, InputFormatConfig.COMMIT_THREAD_POOL_SIZE_DEFAULT),
+          new ThreadFactoryBuilder()
+              .setDaemon(true)
+              .setPriority(Thread.NORM_PRIORITY)
+              .setNameFormat("iceberg-commit-pool-%d")
+              .build());
+
+      Set<DataFile> dataFiles = new ConcurrentHashMap<>().newKeySet();
+
+      // Reading the committed files. The assumption here is that the taskIds are generated in sequential order
+      // starting from 0.
+      Tasks.range(expectedFiles)
+          .executeWith(executor)
+          .retry(3)
+          .run(taskId -> {
+            String taskFileName = LocationHelper.generateToCommitFileLocation(conf, jobContext.getJobID(), taskId);
+            Set<ClosedFileData> closedFiles = readToCommitFile(taskFileName, table.io());
+
+            // If the data is not empty add to the table
+            if (!closedFiles.isEmpty()) {
+              closedFiles.forEach(file -> {
+                DataFiles.Builder builder = DataFiles.builder(table.spec())
+                    .withPath(file.fileName())
+                    .withFormat(file.fileFormat())
+                    .withFileSizeInBytes(file.length())
+                    .withPartition(file.partitionKey())
+                    .withMetrics(file.metrics());
+                dataFiles.add(builder.build());
+              });
+            }
+          });
+
+      if (dataFiles.size() > 0) {
+        // Appending data files to the table
+        AppendFiles append = table.newAppend();
+        Set<String> addedFiles = new HashSet<>(dataFiles.size());
+        dataFiles.forEach(dataFile -> {
+          append.appendFile(dataFile);
+          addedFiles.add(dataFile.path().toString());
+        });
+        append.commit();
+        LOG.info("Commit for Iceberg write taken {} ms for {} with file(s) {}",

Review comment:
       nit phrasing: "Commit took {} ms for table: {} with file(s): {}"

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An Iceberg table committer for adding data files to the Iceberg tables.
+ * Currently independent of the Hive ACID transactions.
+ */
+public final class HiveIcebergOutputCommitter extends OutputCommitter {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class);
+
+  @Override
+  public void setupJob(JobContext jobContext) {
+    // do nothing.
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext taskAttemptContext) {
+    // do nothing.
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext context) {
+    // We need to commit if this is the last phase of a MapReduce process
+    return TaskType.REDUCE.equals(context.getTaskAttemptID().getTaskID().getTaskType()) ||
+        context.getJobConf().getNumReduceTasks() == 0;
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext context) throws IOException {
+    TaskAttemptID attemptID = context.getTaskAttemptID();
+    String commitFileLocation = LocationHelper.generateToCommitFileLocation(context.getJobConf(), attemptID);
+    HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(attemptID);
+
+    Set<ClosedFileData> closedFiles = Collections.emptySet();
+    if (writer != null) {
+      closedFiles = writer.closedFileData();
+    }
+
+    // Create the committed file for the task
+    createToCommitFile(closedFiles, commitFileLocation, new HadoopFileIO(context.getJobConf()));
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext context) throws IOException {
+    // Clean up writer data from the local store
+    HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(context.getTaskAttemptID());
+
+    // Remove files if it was not done already
+    writer.close(true);
+  }
+
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    JobConf conf = jobContext.getJobConf();
+    // If there are reducers, then every reducer will generate a result file.
+    // If this is a map only task, then every mapper will generate a result file.
+    int expectedFiles = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks();
+    Table table = Catalogs.loadTable(conf);
+
+    long startTime = System.currentTimeMillis();
+    LOG.info("Committing job is started for {} using {} expecting {} file(s)", table,

Review comment:
       nit: it might help to include labels for the placeholders
   e.g. "Committing job has started for table: {}, using location: {}, expecting {} file(s)."

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An Iceberg table committer for adding data files to the Iceberg tables.
+ * Currently independent of the Hive ACID transactions.
+ */
+public final class HiveIcebergOutputCommitter extends OutputCommitter {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class);
+
+  @Override
+  public void setupJob(JobContext jobContext) {
+    // do nothing.
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext taskAttemptContext) {
+    // do nothing.
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext context) {
+    // We need to commit if this is the last phase of a MapReduce process
+    return TaskType.REDUCE.equals(context.getTaskAttemptID().getTaskID().getTaskType()) ||
+        context.getJobConf().getNumReduceTasks() == 0;
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext context) throws IOException {
+    TaskAttemptID attemptID = context.getTaskAttemptID();
+    String commitFileLocation = LocationHelper.generateToCommitFileLocation(context.getJobConf(), attemptID);
+    HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(attemptID);
+
+    Set<ClosedFileData> closedFiles = Collections.emptySet();
+    if (writer != null) {
+      closedFiles = writer.closedFileData();
+    }
+
+    // Create the committed file for the task
+    createToCommitFile(closedFiles, commitFileLocation, new HadoopFileIO(context.getJobConf()));
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext context) throws IOException {
+    // Clean up writer data from the local store
+    HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(context.getTaskAttemptID());
+
+    // Remove files if it was not done already
+    writer.close(true);
+  }
+
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    JobConf conf = jobContext.getJobConf();
+    // If there are reducers, then every reducer will generate a result file.
+    // If this is a map only task, then every mapper will generate a result file.
+    int expectedFiles = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks();
+    Table table = Catalogs.loadTable(conf);
+
+    long startTime = System.currentTimeMillis();
+    LOG.info("Committing job is started for {} using {} expecting {} file(s)", table,
+        LocationHelper.generateJobLocation(conf, jobContext.getJobID()), expectedFiles);
+
+    ExecutorService executor = null;
+    try {
+      // Creating executor service for parallel handling of file reads
+      executor = Executors.newFixedThreadPool(
+          conf.getInt(InputFormatConfig.COMMIT_THREAD_POOL_SIZE, InputFormatConfig.COMMIT_THREAD_POOL_SIZE_DEFAULT),
+          new ThreadFactoryBuilder()
+              .setDaemon(true)
+              .setPriority(Thread.NORM_PRIORITY)
+              .setNameFormat("iceberg-commit-pool-%d")
+              .build());
+
+      Set<DataFile> dataFiles = new ConcurrentHashMap<>().newKeySet();
+
+      // Reading the committed files. The assumption here is that the taskIds are generated in sequential order
+      // starting from 0.
+      Tasks.range(expectedFiles)
+          .executeWith(executor)
+          .retry(3)

Review comment:
       do we want to make the retry count configurable?

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An Iceberg table committer for adding data files to the Iceberg tables.
+ * Currently independent of the Hive ACID transactions.
+ */
+public final class HiveIcebergOutputCommitter extends OutputCommitter {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class);
+
+  @Override
+  public void setupJob(JobContext jobContext) {
+    // do nothing.
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext taskAttemptContext) {
+    // do nothing.
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext context) {
+    // We need to commit if this is the last phase of a MapReduce process
+    return TaskType.REDUCE.equals(context.getTaskAttemptID().getTaskID().getTaskType()) ||
+        context.getJobConf().getNumReduceTasks() == 0;
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext context) throws IOException {
+    TaskAttemptID attemptID = context.getTaskAttemptID();
+    String commitFileLocation = LocationHelper.generateToCommitFileLocation(context.getJobConf(), attemptID);
+    HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(attemptID);
+
+    Set<ClosedFileData> closedFiles = Collections.emptySet();
+    if (writer != null) {
+      closedFiles = writer.closedFileData();
+    }
+
+    // Create the committed file for the task
+    createToCommitFile(closedFiles, commitFileLocation, new HadoopFileIO(context.getJobConf()));
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext context) throws IOException {
+    // Clean up writer data from the local store
+    HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(context.getTaskAttemptID());
+
+    // Remove files if it was not done already
+    writer.close(true);
+  }
+
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    JobConf conf = jobContext.getJobConf();
+    // If there are reducers, then every reducer will generate a result file.
+    // If this is a map only task, then every mapper will generate a result file.
+    int expectedFiles = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks();
+    Table table = Catalogs.loadTable(conf);
+
+    long startTime = System.currentTimeMillis();
+    LOG.info("Committing job is started for {} using {} expecting {} file(s)", table,
+        LocationHelper.generateJobLocation(conf, jobContext.getJobID()), expectedFiles);
+
+    ExecutorService executor = null;
+    try {
+      // Creating executor service for parallel handling of file reads
+      executor = Executors.newFixedThreadPool(
+          conf.getInt(InputFormatConfig.COMMIT_THREAD_POOL_SIZE, InputFormatConfig.COMMIT_THREAD_POOL_SIZE_DEFAULT),
+          new ThreadFactoryBuilder()
+              .setDaemon(true)
+              .setPriority(Thread.NORM_PRIORITY)
+              .setNameFormat("iceberg-commit-pool-%d")
+              .build());
+
+      Set<DataFile> dataFiles = new ConcurrentHashMap<>().newKeySet();
+
+      // Reading the committed files. The assumption here is that the taskIds are generated in sequential order
+      // starting from 0.
+      Tasks.range(expectedFiles)
+          .executeWith(executor)
+          .retry(3)
+          .run(taskId -> {
+            String taskFileName = LocationHelper.generateToCommitFileLocation(conf, jobContext.getJobID(), taskId);
+            Set<ClosedFileData> closedFiles = readToCommitFile(taskFileName, table.io());
+
+            // If the data is not empty add to the table
+            if (!closedFiles.isEmpty()) {
+              closedFiles.forEach(file -> {
+                DataFiles.Builder builder = DataFiles.builder(table.spec())
+                    .withPath(file.fileName())
+                    .withFormat(file.fileFormat())
+                    .withFileSizeInBytes(file.length())
+                    .withPartition(file.partitionKey())
+                    .withMetrics(file.metrics());
+                dataFiles.add(builder.build());
+              });
+            }
+          });
+
+      if (dataFiles.size() > 0) {
+        // Appending data files to the table
+        AppendFiles append = table.newAppend();
+        Set<String> addedFiles = new HashSet<>(dataFiles.size());
+        dataFiles.forEach(dataFile -> {
+          append.appendFile(dataFile);
+          addedFiles.add(dataFile.path().toString());
+        });
+        append.commit();
+        LOG.info("Commit for Iceberg write taken {} ms for {} with file(s) {}",
+            System.currentTimeMillis() - startTime, table, addedFiles);
+      } else {
+        LOG.info("Commit for Iceberg write taken {} ms for {} with no new files",
+            System.currentTimeMillis() - startTime, table);
+      }
+
+      // Calling super to cleanupJob if something more is needed
+      cleanupJob(jobContext);
+
+    } finally {
+      if (executor != null) {
+        executor.shutdown();
+      }
+    }
+  }
+
+  @Override
+  public void abortJob(JobContext jobContext, int status) throws IOException {
+    String location = LocationHelper.generateJobLocation(jobContext.getJobConf(), jobContext.getJobID());
+    LOG.info("Job {} is aborted. Cleaning job location {}", jobContext.getJobID(), location);
+
+    // Remove the result directory for the failed job
+    Tasks.foreach(location)
+        .retry(3)
+        .suppressFailureWhenFinished()
+        .onFailure((file, exc) -> LOG.debug("Failed on to remove directory {} on abort job", file, exc))
+        .run(file -> {
+          Path toDelete = new Path(file);
+          FileSystem fs = Util.getFs(toDelete, jobContext.getJobConf());
+          try {
+            fs.delete(toDelete, true /* recursive */);
+          } catch (IOException e) {
+            throw new RuntimeIOException(e, "Failed to delete job directory: %s", file);
+          }
+        });
+    cleanupJob(jobContext);
+  }
+
+  private static void createToCommitFile(Set<ClosedFileData> closedFiles, String location, FileIO io)

Review comment:
       I know this is named `ToCommit` because of the file extension introduced in the Writer. But I personally find this wording a little confusing here (especially before scrolling down to the Writer code) - what do you think about naming this something like `createFileForCommit`? similarly for the read

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class HiveIcebergRecordWriter extends org.apache.hadoop.mapreduce.RecordWriter<NullWritable, Container>
+    implements FileSinkOperator.RecordWriter, org.apache.hadoop.mapred.RecordWriter<NullWritable, Container> {
+  private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergRecordWriter.class);
+
+  // <TaskAttemptId, HiveIcebergRecordWriter> map to store the active writers
+  // Stored in concurrent map, since some executor engines can share containers
+  private static Map<TaskAttemptID, HiveIcebergRecordWriter> writers = new ConcurrentHashMap<>();
+
+  private final FileIO io;
+  private final String location;
+  private final FileFormat fileFormat;
+  private final GenericAppenderFactory appenderFactory;
+  // The current key is reused at every write to avoid unnecessary object creation
+  private final PartitionKey currentKey;
+  // Data for every partition is written to a different appender
+  // This map stores the open appenders for the given partition key
+  private final Map<PartitionKey, AppenderWrapper> openAppenders = new HashMap<>();
+  // When the appenders are closed the file data needed for the Iceberg commit is stored and accessible through
+  // this map
+  private final Map<PartitionKey, ClosedFileData> closedFileData = new HashMap<>();
+
+  HiveIcebergRecordWriter(TaskAttemptID taskAttemptID, Configuration conf, String location, FileFormat fileFormat,
+                          Schema schema, PartitionSpec spec) {
+    this.io = new HadoopFileIO(conf);
+    this.location = location;
+    this.fileFormat = fileFormat;
+    this.appenderFactory = new GenericAppenderFactory(schema);
+    this.currentKey = new PartitionKey(spec, schema);
+    writers.put(taskAttemptID, this);
+    LOG.info("IcebergRecordWriter is created in {} with {}", location, fileFormat);
+  }
+
+  @Override
+  public void write(Writable row) {
+    Preconditions.checkArgument(row instanceof Container);
+
+    Record record = ((Container<Record>) row).get();
+
+    currentKey.partition(record);
+
+    AppenderWrapper currentAppender = openAppenders.get(currentKey);
+    if (currentAppender == null) {
+      currentAppender = getAppender();
+      openAppenders.put(currentKey.copy(), currentAppender);
+    }
+
+    currentAppender.appender.add(record);
+  }
+
+  @Override
+  public void write(NullWritable key, Container value) {
+    write(value);
+  }
+
+  @Override
+  public void close(boolean abort) throws IOException {
+    // Close the open appenders and store the closed file data
+    for (PartitionKey key : openAppenders.keySet()) {
+      AppenderWrapper wrapper = openAppenders.get(key);
+      wrapper.close();
+      closedFileData.put(key,
+          new ClosedFileData(key, wrapper.location, fileFormat, wrapper.length(), wrapper.metrics()));
+    }
+
+    openAppenders.clear();
+
+    // If abort then remove the unnecessary files
+    if (abort) {
+      Tasks.foreach(closedFileData.values().stream().map(ClosedFileData::fileName).iterator())
+          .retry(3)
+          .suppressFailureWhenFinished()
+          .onFailure((file, exception) -> LOG.debug("Failed on to remove file {} on abort", file, exception))
+          .run(io::deleteFile);
+    }
+    LOG.info("IcebergRecordWriter is closed. Created {} files", closedFileData);

Review comment:
       did you mean closedFileData.size() or you wanted to list the map contents?

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/LocationHelper.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.iceberg.mr.InputFormatConfig;
+
+class LocationHelper {
+  private static final String TO_COMMIT_EXTENSION = ".toCommit";
+
+  private LocationHelper() {
+  }
+
+  /**
+   * Generates query directory location based on the configuration.
+   * Currently it uses tableLocation/queryId
+   * @param conf The job's configuration
+   * @return The directory to store the query result files
+   */
+  static String generateQueryLocation(Configuration conf) {
+    String tableLocation = conf.get(InputFormatConfig.TABLE_LOCATION);
+    String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname);
+    return tableLocation + "/" + queryId;
+  }
+
+  /**
+   * Generates the job temp location based on the job configuration.
+   * Currently it uses QUERY_LOCATION/jobId.
+   * @param conf The job's configuration
+   * @param jobId The JobID for the task
+   * @return The file to store the results
+   */
+  static String generateJobLocation(Configuration conf, JobID jobId) {
+    return generateQueryLocation(conf) + "/" + jobId;
+  }
+
+  /**
+   * Generates datafile location based on the task configuration.
+   * Currently it uses QUERY_LOCATION/jobId/taskAttemptId.
+   * @param conf The job's configuration
+   * @param taskAttemptId The TaskAttemptID for the task
+   * @return The file to store the results
+   */
+  static String generateDataFileLocation(Configuration conf, TaskAttemptID taskAttemptId) {
+    return generateJobLocation(conf, taskAttemptId.getJobID()) + "/" + taskAttemptId.toString();
+  }
+
+  /**
+   * Generates file location based on the task configuration and a specific task id.
+   * This file will be used to store the data required to generate the Iceberg commit.
+   * Currently it uses QUERY_LOCATION/jobId/task-[0..numTasks].toCommit.

Review comment:
       shouldn't this be `task-[0..numTasks).toCommit`? (open interval on the right)

##########
File path: mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.junit.Assert;
+
+public class HiveIcebergTestUtils {

Review comment:
       Just to check: will this be merged with the same named new class from https://github.com/apache/iceberg/pull/1854?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org