You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2021/03/30 12:07:38 UTC
[iceberg] branch master updated: Hive: Implement multi-table
inserts (#2228)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d151034 Hive: Implement multi-table inserts (#2228)
d151034 is described below
commit d1510340eaff68d88a2e8194d58e7e493af02bcc
Author: pvary <pv...@cloudera.com>
AuthorDate: Tue Mar 30 14:07:23 2021 +0200
Hive: Implement multi-table inserts (#2228)
---
.../org/apache/iceberg/mr/InputFormatConfig.java | 9 +-
.../mr/hive/HiveIcebergOutputCommitter.java | 316 ++++++++++++++-------
.../iceberg/mr/hive/HiveIcebergOutputFormat.java | 8 +-
.../iceberg/mr/hive/HiveIcebergRecordWriter.java | 15 +-
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 30 +-
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 11 +-
.../iceberg/mr/hive/HiveIcebergTestUtils.java | 3 +-
.../mr/hive/TestHiveIcebergOutputCommitter.java | 39 +--
.../TestHiveIcebergStorageHandlerWithEngine.java | 41 +++
9 files changed, 331 insertions(+), 141 deletions(-)
diff --git a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
index e39ba3c..38e6606 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
@@ -46,7 +46,7 @@ public class InputFormatConfig {
public static final String TABLE_LOCATION = "iceberg.mr.table.location";
public static final String TABLE_SCHEMA = "iceberg.mr.table.schema";
public static final String PARTITION_SPEC = "iceberg.mr.table.partition.spec";
- public static final String SERIALIZED_TABLE = "iceberg.mr.serialized.table";
+ public static final String SERIALIZED_TABLE_PREFIX = "iceberg.mr.serialized.table.";
public static final String LOCALITY = "iceberg.mr.locality";
public static final String CATALOG = "iceberg.mr.catalog";
public static final String HADOOP_CATALOG_WAREHOUSE_LOCATION = "iceberg.mr.catalog.hadoop.warehouse.location";
@@ -54,8 +54,11 @@ public class InputFormatConfig {
public static final String SELECTED_COLUMNS = "iceberg.mr.selected.columns";
public static final String EXTERNAL_TABLE_PURGE = "external.table.purge";
- public static final String COMMIT_THREAD_POOL_SIZE = "iceberg.mr.commit.thread.pool.size";
- public static final int COMMIT_THREAD_POOL_SIZE_DEFAULT = 10;
+ public static final String OUTPUT_TABLES = "iceberg.mr.output.tables";
+ public static final String COMMIT_TABLE_THREAD_POOL_SIZE = "iceberg.mr.commit.table.thread.pool.size";
+ public static final int COMMIT_TABLE_THREAD_POOL_SIZE_DEFAULT = 10;
+ public static final String COMMIT_FILE_THREAD_POOL_SIZE = "iceberg.mr.commit.file.thread.pool.size";
+ public static final int COMMIT_FILE_THREAD_POOL_SIZE_DEFAULT = 10;
public static final String WRITE_TARGET_FILE_SIZE = "iceberg.mr.write.target.file.size";
public static final String CASE_SENSITIVE = "iceberg.mr.case.sensitive";
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index d55a142..3479b36 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -22,10 +22,11 @@ package org.apache.iceberg.mr.hive;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
@@ -90,22 +91,36 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext);
TaskAttemptID attemptID = context.getTaskAttemptID();
- String fileForCommitLocation = generateFileForCommitLocation(context.getJobConf(),
- attemptID.getJobID(), attemptID.getTaskID().getId());
- HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.getWriter(attemptID);
+ JobConf jobConf = context.getJobConf();
+ Map<String, HiveIcebergRecordWriter> writers = HiveIcebergRecordWriter.getWriters(attemptID);
+ Collection<String> outputs = HiveIcebergStorageHandler.outputTables(context.getJobConf());
- DataFile[] closedFiles;
- if (writer != null) {
- closedFiles = writer.dataFiles();
- } else {
- closedFiles = new DataFile[0];
+ ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
+ try {
+ // Generates commit files for the target tables in parallel
+ Tasks.foreach(outputs)
+ .retry(3)
+ .stopOnFailure()
+ .throwFailureWhenFinished()
+ .executeWith(tableExecutor)
+ .run(output -> {
+ Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output);
+ HiveIcebergRecordWriter writer = writers.get(output);
+ DataFile[] closedFiles = writer != null ? writer.dataFiles() : new DataFile[0];
+ String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf,
+ attemptID.getJobID(), attemptID.getTaskID().getId());
+
+ // Creating the file containing the data files generated by this task for this table
+ createFileForCommit(closedFiles, fileForCommitLocation, table.io());
+ }, IOException.class);
+ } finally {
+ if (tableExecutor != null) {
+ tableExecutor.shutdown();
+ }
}
- // Creating the file containing the data files generated by this task
- createFileForCommit(closedFiles, fileForCommitLocation, HiveIcebergStorageHandler.table(context.getJobConf()).io());
-
// remove the writer to release the object
- HiveIcebergRecordWriter.removeWriter(attemptID);
+ HiveIcebergRecordWriter.removeWriters(attemptID);
}
/**
@@ -118,52 +133,61 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext);
// Clean up writer data from the local store
- HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(context.getTaskAttemptID());
+ Map<String, HiveIcebergRecordWriter> writers = HiveIcebergRecordWriter.removeWriters(context.getTaskAttemptID());
// Remove files if it was not done already
- if (writer != null) {
- writer.close(true);
+ if (writers != null) {
+ for (HiveIcebergRecordWriter writer : writers.values()) {
+ writer.close(true);
+ }
}
}
/**
- * Reads the commit files stored in the temp directory and collects the generated committed data files.
- * Appends the data files to the table. At the end removes the temporary directory.
+ * Reads the commit files stored in the temp directories and collects the generated committed data files.
+ * Appends the data files to the tables. At the end removes the temporary directories.
* @param originalContext The job context
- * @throws IOException if there is a failure deleting the files
+ * @throws IOException if there is a failure accessing the files
*/
@Override
public void commitJob(JobContext originalContext) throws IOException {
JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
-
- JobConf conf = jobContext.getJobConf();
- Table table = Catalogs.loadTable(conf);
+ JobConf jobConf = jobContext.getJobConf();
long startTime = System.currentTimeMillis();
- LOG.info("Committing job has started for table: {}, using location: {}", table,
- generateJobLocation(conf, jobContext.getJobID()));
+ LOG.info("Committing job {} has started", jobContext.getJobID());
- FileIO io = HiveIcebergStorageHandler.table(jobContext.getJobConf()).io();
- List<DataFile> dataFiles = dataFiles(jobContext, io, true);
+ Collection<String> outputs = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf());
+ Collection<String> jobLocations = new ConcurrentLinkedQueue<>();
- if (dataFiles.size() > 0) {
- // Appending data files to the table
- AppendFiles append = table.newAppend();
- dataFiles.forEach(append::appendFile);
- append.commit();
- LOG.info("Commit took {} ms for table: {} with {} file(s)", System.currentTimeMillis() - startTime, table,
- dataFiles.size());
- LOG.debug("Added files {}", dataFiles);
- } else {
- LOG.info("Commit took {} ms for table: {} with no new files", System.currentTimeMillis() - startTime, table);
+ ExecutorService fileExecutor = fileExecutor(jobConf);
+ ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
+ try {
+ // Commits the changes for the output tables in parallel
+ Tasks.foreach(outputs)
+ .throwFailureWhenFinished()
+ .stopOnFailure()
+ .executeWith(tableExecutor)
+ .run(output -> {
+ Table table = HiveIcebergStorageHandler.table(jobConf, output);
+ jobLocations.add(generateJobLocation(table.location(), jobConf, jobContext.getJobID()));
+ commitTable(table.io(), fileExecutor, jobContext, output, table.location());
+ });
+ } finally {
+ fileExecutor.shutdown();
+ if (tableExecutor != null) {
+ tableExecutor.shutdown();
+ }
}
- cleanup(jobContext);
+ LOG.info("Commit took {} ms for job {}", System.currentTimeMillis() - startTime, jobContext.getJobID());
+
+ cleanup(jobContext, jobLocations);
}
/**
- * Removes the generated data files, if there is a commit file already generated for them.
- * The cleanup at the end removes the temporary directory as well.
+ * Removes the generated data files if there is a commit file already generated for them.
+ * The cleanup at the end removes the temporary directories as well.
* @param originalContext The job context
* @param status The status of the job
* @throws IOException if there is a failure deleting the files
@@ -171,117 +195,207 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
@Override
public void abortJob(JobContext originalContext, int status) throws IOException {
JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
+ JobConf jobConf = jobContext.getJobConf();
- String location = generateJobLocation(jobContext.getJobConf(), jobContext.getJobID());
- LOG.info("Job {} is aborted. Cleaning job location {}", jobContext.getJobID(), location);
-
- FileIO io = HiveIcebergStorageHandler.table(jobContext.getJobConf()).io();
- List<DataFile> dataFiles = dataFiles(jobContext, io, false);
+ LOG.info("Job {} is aborted. Data file cleaning started", jobContext.getJobID());
+ Collection<String> outputs = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf());
+ Collection<String> jobLocations = new ConcurrentLinkedQueue<>();
- // Check if we have files already committed and remove data files if there are any
- if (dataFiles.size() > 0) {
- Tasks.foreach(dataFiles)
- .retry(3)
+ ExecutorService fileExecutor = fileExecutor(jobConf);
+ ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
+ try {
+ // Cleans up the changes for the output tables in parallel
+ Tasks.foreach(outputs)
.suppressFailureWhenFinished()
- .onFailure((file, exc) -> LOG.debug("Failed on to remove data file {} on abort job", file.path(), exc))
- .run(file -> io.deleteFile(file.path().toString()));
+ .executeWith(tableExecutor)
+ .onFailure((output, exc) -> LOG.warn("Failed cleanup table {} on abort job", output, exc))
+ .run(output -> {
+ LOG.info("Cleaning job for table {}", jobContext.getJobID(), output);
+
+ Table table = HiveIcebergStorageHandler.table(jobConf, output);
+ jobLocations.add(generateJobLocation(table.location(), jobConf, jobContext.getJobID()));
+ Collection<DataFile> dataFiles = dataFiles(fileExecutor, table.location(), jobContext, table.io(), false);
+
+ // Check if we have files already committed and remove data files if there are any
+ if (dataFiles.size() > 0) {
+ Tasks.foreach(dataFiles)
+ .retry(3)
+ .suppressFailureWhenFinished()
+ .executeWith(fileExecutor)
+ .onFailure((file, exc) -> LOG.warn("Failed to remove data file {} on abort job", file.path(), exc))
+ .run(file -> table.io().deleteFile(file.path().toString()));
+ }
+ });
+ } finally {
+ fileExecutor.shutdown();
+ if (tableExecutor != null) {
+ tableExecutor.shutdown();
+ }
}
- cleanup(jobContext);
+ LOG.info("Job {} is aborted. Data file cleaning finished", jobContext.getJobID());
+
+ cleanup(jobContext, jobLocations);
+ }
+
+ /**
+ * Collects the additions to a single table and adds/commits the new files to the Iceberg table.
+ * @param io The io to read the forCommit files
+ * @param executor The executor used to read the forCommit files
+ * @param jobContext The job context
+ * @param name The name of the table used for loading from the catalog
+ * @param location The location of the table used for loading from the catalog
+ */
+ private void commitTable(FileIO io, ExecutorService executor, JobContext jobContext, String name, String location) {
+ JobConf conf = jobContext.getJobConf();
+ Properties catalogProperties = new Properties();
+ catalogProperties.put(Catalogs.NAME, name);
+ catalogProperties.put(Catalogs.LOCATION, location);
+ Table table = Catalogs.loadTable(conf, catalogProperties);
+
+ long startTime = System.currentTimeMillis();
+ LOG.info("Committing job has started for table: {}, using location: {}",
+ table, generateJobLocation(location, conf, jobContext.getJobID()));
+
+ Collection<DataFile> dataFiles = dataFiles(executor, location, jobContext, io, true);
+
+ if (dataFiles.size() > 0) {
+ // Appending data files to the table
+ AppendFiles append = table.newAppend();
+ dataFiles.forEach(append::appendFile);
+ append.commit();
+ LOG.info("Commit took {} ms for table: {} with {} file(s)", System.currentTimeMillis() - startTime, table,
+ dataFiles.size());
+ LOG.debug("Added files {}", dataFiles);
+ } else {
+ LOG.info("Commit took {} ms for table: {} with no new files", System.currentTimeMillis() - startTime, table);
+ }
}
/**
- * Cleans up the jobs temporary location.
+ * Cleans up the jobs temporary locations. For every target table there is a temp dir to clean up.
* @param jobContext The job context
+ * @param jobLocations The locations to clean up
* @throws IOException if there is a failure deleting the files
*/
- private void cleanup(JobContext jobContext) throws IOException {
- String location = generateJobLocation(jobContext.getJobConf(), jobContext.getJobID());
- LOG.info("Cleaning for job: {} on location: {}", jobContext.getJobID(), location);
+ private void cleanup(JobContext jobContext, Collection<String> jobLocations) throws IOException {
+ JobConf jobConf = jobContext.getJobConf();
+
+ LOG.info("Cleaning for job {} started", jobContext.getJobID());
- // Remove the job's temp directory recursively.
- // Intentionally used foreach on a single item. Using the Tasks API here only for the retry capability.
- Tasks.foreach(location)
+ // Remove the job's temp directories recursively.
+ Tasks.foreach(jobLocations)
.retry(3)
.suppressFailureWhenFinished()
- .onFailure((file, exc) -> LOG.debug("Failed on to remove directory {} on cleanup job", file, exc))
- .run(file -> {
- Path toDelete = new Path(file);
- FileSystem fs = Util.getFs(toDelete, jobContext.getJobConf());
+ .onFailure((jobLocation, exc) -> LOG.debug("Failed to remove directory {} on job cleanup", jobLocation, exc))
+ .run(jobLocation -> {
+ LOG.info("Cleaning location: {}", jobLocation);
+ Path toDelete = new Path(jobLocation);
+ FileSystem fs = Util.getFs(toDelete, jobConf);
fs.delete(toDelete, true);
}, IOException.class);
+
+ LOG.info("Cleaning for job {} finished", jobContext.getJobID());
+ }
+
+ /**
+ * Executor service for parallel handling of file reads. Should be shared when committing multiple tables.
+ * @param conf The configuration containing the pool size
+ * @return The generated executor service
+ */
+ private static ExecutorService fileExecutor(Configuration conf) {
+ int size = conf.getInt(InputFormatConfig.COMMIT_FILE_THREAD_POOL_SIZE,
+ InputFormatConfig.COMMIT_FILE_THREAD_POOL_SIZE_DEFAULT);
+ return Executors.newFixedThreadPool(
+ size,
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setPriority(Thread.NORM_PRIORITY)
+ .setNameFormat("iceberg-commit-file-pool-%d")
+ .build());
}
/**
- * Get the data committed data files for this job.
+ * Executor service for parallel handling of table manipulation. Could return null, if no parallelism is possible.
+ * @param conf The configuration containing the pool size
+ * @param maxThreadNum The number of requests we want to handle (might be decreased further by configuration)
+ * @return The generated executor service, or null if executor is not needed.
+ */
+ private static ExecutorService tableExecutor(Configuration conf, int maxThreadNum) {
+ int size = conf.getInt(InputFormatConfig.COMMIT_TABLE_THREAD_POOL_SIZE,
+ InputFormatConfig.COMMIT_TABLE_THREAD_POOL_SIZE_DEFAULT);
+ size = Math.min(maxThreadNum, size);
+ if (size > 1) {
+ return Executors.newFixedThreadPool(
+ size,
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setPriority(Thread.NORM_PRIORITY)
+ .setNameFormat("iceberg-commit-table-pool-%d")
+ .build());
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Get the committed data files for this table and job.
+ * @param executor The executor used for reading the forCommit files parallel
+ * @param location The location of the table
* @param jobContext The job context
* @param io The FileIO used for reading a files generated for commit
* @param throwOnFailure If <code>true</code> then it throws an exception on failure
* @return The list of the committed data files
*/
- private static List<DataFile> dataFiles(JobContext jobContext, FileIO io, boolean throwOnFailure) {
+ private static Collection<DataFile> dataFiles(ExecutorService executor, String location, JobContext jobContext,
+ FileIO io, boolean throwOnFailure) {
JobConf conf = jobContext.getJobConf();
// If there are reducers, then every reducer will generate a result file.
// If this is a map only task, then every mapper will generate a result file.
int expectedFiles = conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks();
- ExecutorService executor = null;
- try {
- // Creating executor service for parallel handling of file reads
- executor = Executors.newFixedThreadPool(
- conf.getInt(InputFormatConfig.COMMIT_THREAD_POOL_SIZE, InputFormatConfig.COMMIT_THREAD_POOL_SIZE_DEFAULT),
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setPriority(Thread.NORM_PRIORITY)
- .setNameFormat("iceberg-commit-pool-%d")
- .build());
-
- List<DataFile> dataFiles = Collections.synchronizedList(new ArrayList<>());
+ Collection<DataFile> dataFiles = new ConcurrentLinkedQueue<>();
- // Reading the committed files. The assumption here is that the taskIds are generated in sequential order
- // starting from 0.
- Tasks.range(expectedFiles)
- .throwFailureWhenFinished(throwOnFailure)
- .executeWith(executor)
- .retry(3)
- .run(taskId -> {
- String taskFileName = generateFileForCommitLocation(conf, jobContext.getJobID(), taskId);
- dataFiles.addAll(Arrays.asList(readFileForCommit(taskFileName, io)));
- });
+ // Reading the committed files. The assumption here is that the taskIds are generated in sequential order
+ // starting from 0.
+ Tasks.range(expectedFiles)
+ .throwFailureWhenFinished(throwOnFailure)
+ .executeWith(executor)
+ .retry(3)
+ .run(taskId -> {
+ String taskFileName = generateFileForCommitLocation(location, conf, jobContext.getJobID(), taskId);
+ dataFiles.addAll(Arrays.asList(readFileForCommit(taskFileName, io)));
+ });
- return dataFiles;
- } finally {
- if (executor != null) {
- executor.shutdown();
- }
- }
+ return dataFiles;
}
/**
* Generates the job temp location based on the job configuration.
- * Currently it uses QUERY_LOCATION-jobId.
+ * Currently it uses TABLE_LOCATION/temp/QUERY_ID-jobId.
+ * @param location The location of the table
* @param conf The job's configuration
* @param jobId The JobID for the task
* @return The file to store the results
*/
@VisibleForTesting
- static String generateJobLocation(Configuration conf, JobID jobId) {
- String tableLocation = conf.get(InputFormatConfig.TABLE_LOCATION);
+ static String generateJobLocation(String location, Configuration conf, JobID jobId) {
String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname);
- return tableLocation + "/temp/" + queryId + "-" + jobId;
+ return location + "/temp/" + queryId + "-" + jobId;
}
/**
* Generates file location based on the task configuration and a specific task id.
* This file will be used to store the data required to generate the Iceberg commit.
- * Currently it uses QUERY_LOCATION-jobId/task-[0..numTasks).forCommit.
+ * Currently it uses TABLE_LOCATION/temp/QUERY_ID-jobId/task-[0..numTasks).forCommit.
+ * @param location The location of the table
* @param conf The job's configuration
* @param jobId The jobId for the task
* @param taskId The taskId for the commit file
* @return The file to store the results
*/
- private static String generateFileForCommitLocation(Configuration conf, JobID jobId, int taskId) {
- return generateJobLocation(conf, jobId) + "/task-" + taskId + FOR_COMMIT_EXTENSION;
+ private static String generateFileForCommitLocation(String location, Configuration conf, JobID jobId, int taskId) {
+ return generateJobLocation(location, conf, jobId) + "/task-" + taskId + FOR_COMMIT_EXTENSION;
}
private static void createFileForCommit(DataFile[] closedFiles, String location, FileIO io)
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index 9a2ae60..080c015 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -23,6 +23,7 @@ import java.util.Properties;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.io.NullWritable;
@@ -41,6 +42,7 @@ import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.util.PropertyUtil;
@@ -66,7 +68,8 @@ public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Co
private static HiveIcebergRecordWriter writer(JobConf jc) {
TaskAttemptID taskAttemptID = TezUtil.taskAttemptWrapper(jc);
- Table table = HiveIcebergStorageHandler.table(jc);
+ // It gets the config from the FileSinkOperator which has its own config for every target table
+ Table table = HiveIcebergStorageHandler.table(jc, jc.get(hive_metastoreConstants.META_TABLE_NAME));
Schema schema = HiveIcebergStorageHandler.schema(jc);
PartitionSpec spec = table.spec();
FileFormat fileFormat = FileFormat.valueOf(PropertyUtil.propertyAsString(table.properties(),
@@ -79,8 +82,9 @@ public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Co
OutputFileFactory outputFileFactory =
new OutputFileFactory(spec, fileFormat, location, io, encryption, taskAttemptID.getTaskID().getId(),
taskAttemptID.getId(), jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + taskAttemptID.getJobID());
+ String tableName = jc.get(Catalogs.NAME);
HiveIcebergRecordWriter writer = new HiveIcebergRecordWriter(schema, spec, fileFormat,
- new GenericAppenderFactory(schema, spec), outputFileFactory, io, targetFileSize, taskAttemptID);
+ new GenericAppenderFactory(schema, spec), outputFileFactory, io, targetFileSize, taskAttemptID, tableName);
return writer;
}
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
index 8622d1a..2adf43d 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
@@ -21,7 +21,6 @@ package org.apache.iceberg.mr.hive;
import java.io.IOException;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
@@ -38,6 +37,7 @@ import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PartitionedFanoutWriter;
import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,25 +50,26 @@ class HiveIcebergRecordWriter extends PartitionedFanoutWriter<Record>
private final PartitionKey currentKey;
private final FileIO io;
- // <TaskAttemptId, HiveIcebergRecordWriter> map to store the active writers
+ // <TaskAttemptId, <TABLE_NAME, HiveIcebergRecordWriter>> map to store the active writers
// Stored in concurrent map, since some executor engines can share containers
- private static final Map<TaskAttemptID, HiveIcebergRecordWriter> writers = new ConcurrentHashMap<>();
+ private static final Map<TaskAttemptID, Map<String, HiveIcebergRecordWriter>> writers = Maps.newConcurrentMap();
- static HiveIcebergRecordWriter removeWriter(TaskAttemptID taskAttemptID) {
+ static Map<String, HiveIcebergRecordWriter> removeWriters(TaskAttemptID taskAttemptID) {
return writers.remove(taskAttemptID);
}
- static HiveIcebergRecordWriter getWriter(TaskAttemptID taskAttemptID) {
+ static Map<String, HiveIcebergRecordWriter> getWriters(TaskAttemptID taskAttemptID) {
return writers.get(taskAttemptID);
}
HiveIcebergRecordWriter(Schema schema, PartitionSpec spec, FileFormat format,
FileAppenderFactory<Record> appenderFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
- TaskAttemptID taskAttemptID) {
+ TaskAttemptID taskAttemptID, String tableName) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
this.io = io;
this.currentKey = new PartitionKey(spec, schema);
- writers.put(taskAttemptID, this);
+ writers.putIfAbsent(taskAttemptID, Maps.newConcurrentMap());
+ writers.get(taskAttemptID).put(tableName, this);
}
@Override
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 42a4982..015bcb0 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.mr.hive;
import java.io.Serializable;
+import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
@@ -41,10 +42,14 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.SerializationUtil;
public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, HiveStorageHandler {
+ private static final Splitter TABLE_NAME_SPLITTER = Splitter.on("..");
+ private static final String TABLE_NAME_SEPARATOR = "..";
static final String WRITE_KEY = "HiveIcebergStorageHandler_write";
@@ -108,7 +113,13 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
if (tableDesc != null && tableDesc.getProperties() != null &&
tableDesc.getProperties().get(WRITE_KEY) != null) {
+ Preconditions.checkArgument(!tableDesc.getTableName().contains(TABLE_NAME_SEPARATOR),
+ "Can not handle table " + tableDesc.getTableName() + ". Its name contains '" + TABLE_NAME_SEPARATOR + "'");
+ String tables = jobConf.get(InputFormatConfig.OUTPUT_TABLES);
+ tables = tables == null ? tableDesc.getTableName() : tables + TABLE_NAME_SEPARATOR + tableDesc.getTableName();
+
jobConf.set("mapred.output.committer.class", HiveIcebergOutputCommitter.class.getName());
+ jobConf.set(InputFormatConfig.OUTPUT_TABLES, tables);
}
}
@@ -142,12 +153,22 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
}
/**
- * Returns the Table serialized to the configuration.
+ * Returns the Table serialized to the configuration based on the table name.
* @param config The configuration used to get the data from
+ * @param name The name of the table we need as returned by TableDesc.getTableName()
* @return The Table
*/
- public static Table table(Configuration config) {
- return SerializationUtil.deserializeFromBase64(config.get(InputFormatConfig.SERIALIZED_TABLE));
+ public static Table table(Configuration config, String name) {
+ return SerializationUtil.deserializeFromBase64(config.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX + name));
+ }
+
+ /**
+ * Returns the names of the output tables stored in the configuration.
+ * @param config The configuration used to get the data from
+ * @return The collection of the table names as returned by TableDesc.getTableName()
+ */
+ public static Collection<String> outputTables(Configuration config) {
+ return TABLE_NAME_SPLITTER.splitToList(config.get(InputFormatConfig.OUTPUT_TABLES));
}
/**
@@ -190,7 +211,8 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
map.put(InputFormatConfig.TABLE_SCHEMA, schemaJson);
if (table instanceof Serializable) {
- map.put(InputFormatConfig.SERIALIZED_TABLE, SerializationUtil.serializeToBase64(table));
+ map.put(InputFormatConfig.SERIALIZED_TABLE_PREFIX + tableDesc.getTableName(),
+ SerializationUtil.serializeToBase64(table));
}
// We need to remove this otherwise the job.xml will be invalid as column comments are separated with '\0' and
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index a67c513..0a43189 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import org.apache.hadoop.conf.Configuration;
@@ -64,6 +65,7 @@ import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -93,12 +95,9 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
@Override
public List<InputSplit> getSplits(JobContext context) {
Configuration conf = context.getConfiguration();
- Table table;
- if (conf.get(InputFormatConfig.SERIALIZED_TABLE) != null) {
- table = SerializationUtil.deserializeFromBase64(conf.get(InputFormatConfig.SERIALIZED_TABLE));
- } else {
- table = Catalogs.loadTable(conf);
- }
+ Table table = Optional
+ .ofNullable(HiveIcebergStorageHandler.table(conf, conf.get(InputFormatConfig.TABLE_IDENTIFIER)))
+ .orElseGet(() -> Catalogs.loadTable(conf));
TableScan scan = table.newScan()
.caseSensitive(conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT));
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
index cc1f2cb..b751af8 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
@@ -263,6 +263,7 @@ public class HiveIcebergTestUtils {
.collect(Collectors.toList());
Assert.assertEquals(dataFileNum, dataFiles.size());
- Assert.assertFalse(new File(HiveIcebergOutputCommitter.generateJobLocation(conf, jobId)).exists());
+ Assert.assertFalse(
+ new File(HiveIcebergOutputCommitter.generateJobLocation(table.location(), conf, jobId)).exists());
}
}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
index 0e132df..1f5466b 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
@@ -47,10 +47,12 @@ import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.TestHelper;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SerializationUtil;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -58,7 +60,7 @@ import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
-import static org.apache.iceberg.mr.hive.HiveIcebergRecordWriter.getWriter;
+import static org.apache.iceberg.mr.hive.HiveIcebergRecordWriter.getWriters;
import static org.apache.iceberg.types.Types.NestedField.required;
public class TestHiveIcebergOutputCommitter {
@@ -107,7 +109,7 @@ public class TestHiveIcebergOutputCommitter {
HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
Table table = table(temp.getRoot().getPath(), false);
JobConf conf = jobConf(table, 1);
- List<Record> expected = writeRecords(1, 0, true, false, conf);
+ List<Record> expected = writeRecords(table.name(), 1, 0, true, false, conf);
committer.commitJob(new JobContextImpl(conf, JOB_ID));
HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 1);
@@ -119,7 +121,7 @@ public class TestHiveIcebergOutputCommitter {
HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
Table table = table(temp.getRoot().getPath(), true);
JobConf conf = jobConf(table, 1);
- List<Record> expected = writeRecords(1, 0, true, false, conf);
+ List<Record> expected = writeRecords(table.name(), 1, 0, true, false, conf);
committer.commitJob(new JobContextImpl(conf, JOB_ID));
HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 3);
@@ -131,7 +133,7 @@ public class TestHiveIcebergOutputCommitter {
HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
Table table = table(temp.getRoot().getPath(), false);
JobConf conf = jobConf(table, 2);
- List<Record> expected = writeRecords(2, 0, true, false, conf);
+ List<Record> expected = writeRecords(table.name(), 2, 0, true, false, conf);
committer.commitJob(new JobContextImpl(conf, JOB_ID));
HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 2);
@@ -143,7 +145,7 @@ public class TestHiveIcebergOutputCommitter {
HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
Table table = table(temp.getRoot().getPath(), true);
JobConf conf = jobConf(table, 2);
- List<Record> expected = writeRecords(2, 0, true, false, conf);
+ List<Record> expected = writeRecords(table.name(), 2, 0, true, false, conf);
committer.commitJob(new JobContextImpl(conf, JOB_ID));
HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 6);
@@ -157,18 +159,18 @@ public class TestHiveIcebergOutputCommitter {
JobConf conf = jobConf(table, 2);
// Write records and abort the tasks
- writeRecords(2, 0, false, true, conf);
+ writeRecords(table.name(), 2, 0, false, true, conf);
HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 0);
HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0);
// Write records but do not abort the tasks
// The data files remain since we can not identify them but should not be read
- writeRecords(2, 1, false, false, conf);
+ writeRecords(table.name(), 2, 1, false, false, conf);
HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 2);
HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0);
// Write and commit the records
- List<Record> expected = writeRecords(2, 2, true, false, conf);
+ List<Record> expected = writeRecords(table.name(), 2, 2, true, false, conf);
committer.commitJob(new JobContextImpl(conf, JOB_ID));
HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 4);
HiveIcebergTestUtils.validateData(table, expected, 0);
@@ -179,7 +181,7 @@ public class TestHiveIcebergOutputCommitter {
HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
Table table = table(temp.getRoot().getPath(), false);
JobConf conf = jobConf(table, 1);
- writeRecords(1, 0, true, false, conf);
+ writeRecords(table.name(), 1, 0, true, false, conf);
committer.abortJob(new JobContextImpl(conf, JOB_ID), JobStatus.State.FAILED);
HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 0);
@@ -198,7 +200,7 @@ public class TestHiveIcebergOutputCommitter {
Table table = table(temp.getRoot().getPath(), false);
JobConf conf = jobConf(table, 1);
try {
- writeRecords(1, 0, true, false, conf, failingCommitter);
+ writeRecords(table.name(), 1, 0, true, false, conf, failingCommitter);
Assert.fail();
} catch (RuntimeException e) {
Assert.assertTrue(e.getMessage().contains(exceptionMessage));
@@ -207,10 +209,10 @@ public class TestHiveIcebergOutputCommitter {
Assert.assertEquals(1, argumentCaptor.getAllValues().size());
TaskAttemptID capturedId = TezUtil.taskAttemptWrapper(argumentCaptor.getValue().getTaskAttemptID());
// writer is still in the map after commitTask failure
- Assert.assertNotNull(getWriter(capturedId));
+ Assert.assertNotNull(getWriters(capturedId));
failingCommitter.abortTask(new TaskAttemptContextImpl(conf, capturedId));
// abortTask succeeds and removes writer
- Assert.assertNull(getWriter(capturedId));
+ Assert.assertNull(getWriters(capturedId));
}
private Table table(String location, boolean partitioned) {
@@ -223,6 +225,8 @@ public class TestHiveIcebergOutputCommitter {
conf.setNumMapTasks(taskNum);
conf.setNumReduceTasks(0);
conf.set(HiveConf.ConfVars.HIVEQUERYID.varname, QUERY_ID);
+ conf.set(InputFormatConfig.OUTPUT_TABLES, table.name());
+ conf.set(InputFormatConfig.SERIALIZED_TABLE_PREFIX + table.name(), SerializationUtil.serializeToBase64(table));
Map<String, String> propMap = Maps.newHashMap();
TableDesc tableDesc = new TableDesc();
@@ -237,6 +241,7 @@ public class TestHiveIcebergOutputCommitter {
/**
* Write random records to the given table using separate {@link HiveIcebergOutputCommitter} and
* a separate {@link HiveIcebergRecordWriter} for every task.
+ * @param name The name of the table to get the table object from the conf
* @param taskNum The number of tasks in the job handled by the committer
* @param attemptNum The id used for attempt number generation
* @param commitTasks If <code>true</code> the tasks will be committed
@@ -247,11 +252,11 @@ public class TestHiveIcebergOutputCommitter {
* @return The random generated records which were appended to the table
* @throws IOException Propagating {@link HiveIcebergRecordWriter} exceptions
*/
- private List<Record> writeRecords(int taskNum, int attemptNum, boolean commitTasks, boolean abortTasks,
+ private List<Record> writeRecords(String name, int taskNum, int attemptNum, boolean commitTasks, boolean abortTasks,
JobConf conf, OutputCommitter committer) throws IOException {
List<Record> expected = new ArrayList<>(RECORD_NUM * taskNum);
- Table table = HiveIcebergStorageHandler.table(conf);
+ Table table = HiveIcebergStorageHandler.table(conf, name);
FileIO io = table.io();
LocationProvider location = table.locationProvider();
EncryptionManager encryption = table.encryption();
@@ -266,7 +271,7 @@ public class TestHiveIcebergOutputCommitter {
attemptNum, QUERY_ID + "-" + JOB_ID);
HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema, spec, FileFormat.PARQUET,
new GenericAppenderFactory(schema), outputFileFactory, io, TARGET_FILE_SIZE,
- TezUtil.taskAttemptWrapper(taskId));
+ TezUtil.taskAttemptWrapper(taskId), conf.get(Catalogs.NAME));
Container<Record> container = new Container<>();
@@ -287,8 +292,8 @@ public class TestHiveIcebergOutputCommitter {
return expected;
}
- private List<Record> writeRecords(int taskNum, int attemptNum, boolean commitTasks, boolean abortTasks,
+ private List<Record> writeRecords(String name, int taskNum, int attemptNum, boolean commitTasks, boolean abortTasks,
JobConf conf) throws IOException {
- return writeRecords(taskNum, attemptNum, commitTasks, abortTasks, conf, new HiveIcebergOutputCommitter());
+ return writeRecords(name, taskNum, attemptNum, commitTasks, abortTasks, conf, new HiveIcebergOutputCommitter());
}
}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index fcb4311..41aef8b 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -621,6 +621,47 @@ public class TestHiveIcebergStorageHandlerWithEngine {
HiveIcebergTestUtils.validateData(table, records, 0);
}
+ @Test
+ public void testMultiTableInsert() throws IOException {
+ Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+ testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+ Schema target1Schema = new Schema(
+ optional(1, "customer_id", Types.LongType.get()),
+ optional(2, "first_name", Types.StringType.get())
+ );
+
+ Schema target2Schema = new Schema(
+ optional(1, "last_name", Types.StringType.get()),
+ optional(2, "customer_id", Types.LongType.get())
+ );
+
+ List<Record> target1Records = TestHelper.RecordsBuilder.newInstance(target1Schema)
+ .add(0L, "Alice")
+ .add(1L, "Bob")
+ .add(2L, "Trudy")
+ .build();
+
+ List<Record> target2Records = TestHelper.RecordsBuilder.newInstance(target2Schema)
+ .add("Brown", 0L)
+ .add("Green", 1L)
+ .add("Pink", 2L)
+ .build();
+
+ Table target1 = testTables.createTable(shell, "target1", target1Schema, fileFormat, ImmutableList.of());
+ Table target2 = testTables.createTable(shell, "target2", target2Schema, fileFormat, ImmutableList.of());
+
+ shell.executeStatement("FROM customers " +
+ "INSERT INTO target1 SELECT customer_id, first_name " +
+ "INSERT INTO target2 SELECT last_name, customer_id");
+
+ // Check that everything is as expected
+ HiveIcebergTestUtils.validateData(target1, target1Records, 0);
+ HiveIcebergTestUtils.validateData(target2, target2Records, 1);
+ }
+
private void testComplexTypeWrite(Schema schema, List<Record> records) throws IOException {
String tableName = "complex_table";
Table table = testTables.createTable(shell, "complex_table", schema, fileFormat, ImmutableList.of());