You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2022/05/12 13:53:24 UTC
[hive] branch master updated: HIVE-26202: Refactor Iceberg Writers (Peter Vary reviewed by Laszlo Pinter) (#3269)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a1906b9f00 HIVE-26202: Refactor Iceberg Writers (Peter Vary reviewed by Laszlo Pinter) (#3269)
a1906b9f00 is described below
commit a1906b9f00a2ac182d10951cbc5e4c30b40aadc9
Author: pvary <pv...@cloudera.com>
AuthorDate: Thu May 12 15:53:16 2022 +0200
HIVE-26202: Refactor Iceberg Writers (Peter Vary reviewed by Laszlo Pinter) (#3269)
---
.../mr/hive/HiveIcebergOutputCommitter.java | 12 +-
.../iceberg/mr/hive/HiveIcebergOutputFormat.java | 51 +++-----
.../apache/iceberg/mr/hive/HiveIcebergSerDe.java | 22 ++--
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 18 +--
.../hive/{ => writer}/HiveFileWriterFactory.java | 6 +-
.../HiveIcebergBufferedDeleteWriter.java | 30 +++--
.../hive/{ => writer}/HiveIcebergDeleteWriter.java | 17 +--
.../hive/{ => writer}/HiveIcebergRecordWriter.java | 15 ++-
.../hive/{ => writer}/HiveIcebergUpdateWriter.java | 32 ++---
.../mr/hive/{ => writer}/HiveIcebergWriter.java | 9 +-
.../hive/{ => writer}/HiveIcebergWriterBase.java | 39 +-----
.../iceberg/mr/hive/writer/WriterBuilder.java | 133 +++++++++++++++++++++
.../iceberg/mr/hive/writer/WriterRegistry.java | 44 +++++++
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 34 ++++--
.../mr/hive/TestHiveIcebergOutputCommitter.java | 37 ++----
.../{ => writer}/HiveIcebergWriterTestBase.java | 14 ++-
.../{ => writer}/TestHiveIcebergDeleteWriter.java | 27 +----
.../{ => writer}/TestHiveIcebergUpdateWriter.java | 37 ++----
18 files changed, 340 insertions(+), 237 deletions(-)
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index 0ea882a450..825ee3dc39 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -61,6 +61,8 @@ import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.hive.writer.HiveIcebergWriter;
+import org.apache.iceberg.mr.hive.writer.WriterRegistry;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -107,7 +109,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
TaskAttemptID attemptID = context.getTaskAttemptID();
JobConf jobConf = context.getJobConf();
Collection<String> outputs = HiveIcebergStorageHandler.outputTables(context.getJobConf());
- Map<String, HiveIcebergWriter> writers = Optional.ofNullable(HiveIcebergWriterBase.getWriters(attemptID))
+ Map<String, HiveIcebergWriter> writers = Optional.ofNullable(WriterRegistry.writers(attemptID))
.orElseGet(() -> {
LOG.info("CommitTask found no writers for output tables: {}, attemptID: {}", outputs, attemptID);
return ImmutableMap.of();
@@ -146,7 +148,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
}
// remove the writer to release the object
- HiveIcebergWriterBase.removeWriters(attemptID);
+ WriterRegistry.removeWriters(attemptID);
}
/**
@@ -159,7 +161,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext);
// Clean up writer data from the local store
- Map<String, HiveIcebergWriter> writers = HiveIcebergWriterBase.removeWriters(context.getTaskAttemptID());
+ Map<String, HiveIcebergWriter> writers = WriterRegistry.removeWriters(context.getTaskAttemptID());
// Remove files if it was not done already
if (writers != null) {
@@ -335,8 +337,8 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
if (!conf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) {
if (writeResults.isEmpty()) {
LOG.info(
- "Not creating a new commit for table: {}, jobID: {}, isDelete: {}, since there were no new files to add",
- table, jobContext.getJobID(), HiveIcebergStorageHandler.isDelete(conf, name));
+ "Not creating a new commit for table: {}, jobID: {}, operation: {}, since there were no new files to add",
+ table, jobContext.getJobID(), HiveIcebergStorageHandler.operation(conf, name));
} else {
commitWrite(table, startTime, writeResults);
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index 552bffa236..7bf1b1ec0d 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -19,7 +19,6 @@
package org.apache.iceberg.mr.hive;
-import java.util.Locale;
import java.util.Properties;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -32,19 +31,17 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.util.Progressable;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.hive.writer.HiveIcebergWriter;
+import org.apache.iceberg.mr.hive.writer.WriterBuilder;
import org.apache.iceberg.mr.mapred.Container;
-import org.apache.iceberg.util.PropertyUtil;
public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Container<Record>>,
HiveOutputFormat<NullWritable, Container<Record>> {
+ private static final String DELETE_FILE_THREAD_POOL_SIZE = "iceberg.delete.file.thread.pool.size";
+ private static final int DELETE_FILE_THREAD_POOL_SIZE_DEFAULT = 10;
@Override
public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, Class valueClass,
@@ -63,39 +60,19 @@ public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Co
// Not doing any check.
}
- private static HiveIcebergWriterBase writer(JobConf jc) {
+ private static HiveIcebergWriter writer(JobConf jc) {
TaskAttemptID taskAttemptID = TezUtil.taskAttemptWrapper(jc);
// It gets the config from the FileSinkOperator which has its own config for every target table
Table table = HiveIcebergStorageHandler.table(jc, jc.get(hive_metastoreConstants.META_TABLE_NAME));
- Schema schema = HiveIcebergStorageHandler.schema(jc);
- FileFormat fileFormat = FileFormat.valueOf(PropertyUtil.propertyAsString(table.properties(),
- TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT).toUpperCase(Locale.ENGLISH));
- long targetFileSize = PropertyUtil.propertyAsLong(table.properties(), TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
- TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
- FileIO io = table.io();
- int partitionId = taskAttemptID.getTaskID().getId();
- int taskId = taskAttemptID.getId();
- String operationId = jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + taskAttemptID.getJobID();
- OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId)
- .format(fileFormat)
- .operationId(operationId)
- .build();
String tableName = jc.get(Catalogs.NAME);
- if (HiveIcebergStorageHandler.isDelete(jc, tableName)) {
- HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, fileFormat, schema, null, fileFormat,
- null, null, null, schema);
- return new HiveIcebergDeleteWriter(schema, table.specs(), fileFormat, writerFactory, outputFileFactory, io,
- targetFileSize, taskAttemptID, tableName);
- } else if (HiveIcebergStorageHandler.isUpdate(jc, tableName)) {
- HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, fileFormat, schema, null, fileFormat,
- null, null, null, null);
- return new HiveIcebergUpdateWriter(schema, table.specs(), table.spec().specId(), fileFormat, writerFactory,
- outputFileFactory, io, targetFileSize, taskAttemptID, tableName, jc);
- } else {
- HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, fileFormat, schema, null, fileFormat,
- null, null, null, schema);
- return new HiveIcebergRecordWriter(schema, table.specs(), table.spec().specId(), fileFormat, writerFactory,
- outputFileFactory, io, targetFileSize, taskAttemptID, tableName, false);
- }
+ int poolSize = jc.getInt(DELETE_FILE_THREAD_POOL_SIZE, DELETE_FILE_THREAD_POOL_SIZE_DEFAULT);
+
+ return WriterBuilder.builderFor(table)
+ .queryId(jc.get(HiveConf.ConfVars.HIVEQUERYID.varname))
+ .tableName(tableName)
+ .attemptID(taskAttemptID)
+ .poolSize(poolSize)
+ .operation(HiveIcebergStorageHandler.operation(jc, tableName))
+ .build();
}
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index a4470058c7..99820583a1 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
@@ -144,15 +145,18 @@ public class HiveIcebergSerDe extends AbstractSerDe {
}
private static Schema projectedSchema(Configuration configuration, String tableName, Schema tableSchema) {
- if (HiveIcebergStorageHandler.isDelete(configuration, tableName)) {
- // when writing delete files, we should use the full delete schema
- return IcebergAcidUtil.createSerdeSchemaForDelete(tableSchema.columns());
- } else if (HiveIcebergStorageHandler.isUpdate(configuration, tableName)) {
- // when writing delete files, we should use the full delete schema
- return IcebergAcidUtil.createSerdeSchemaForUpdate(tableSchema.columns());
- } else if (HiveIcebergStorageHandler.isWrite(configuration, tableName)) {
- // when writing out data, we should not do projection push down
- return tableSchema;
+ Context.Operation operation = HiveIcebergStorageHandler.operation(configuration, tableName);
+ if (operation != null) {
+ switch (operation) {
+ case DELETE:
+ return IcebergAcidUtil.createSerdeSchemaForDelete(tableSchema.columns());
+ case UPDATE:
+ return IcebergAcidUtil.createSerdeSchemaForUpdate(tableSchema.columns());
+ case OTHER:
+ return tableSchema;
+ default:
+ throw new IllegalArgumentException("Unsupported operation " + operation);
+ }
} else {
configuration.setBoolean(InputFormatConfig.CASE_SENSITIVE, false);
String[] selectedColumns = ColumnProjectionUtils.getReadColumnNames(configuration);
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 39042691e8..68bd647521 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -556,19 +556,13 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
return false;
}
- public static boolean isWrite(Configuration conf, String tableName) {
- return conf != null && tableName != null && Operation.OTHER.name().equals(
- conf.get(InputFormatConfig.OPERATION_TYPE_PREFIX + tableName));
- }
-
- public static boolean isDelete(Configuration conf, String tableName) {
- return conf != null && tableName != null && Operation.DELETE.name().equals(
- conf.get(InputFormatConfig.OPERATION_TYPE_PREFIX + tableName));
- }
+ public static Operation operation(Configuration conf, String tableName) {
+ if (conf == null || tableName == null) {
+ return null;
+ }
- public static boolean isUpdate(Configuration conf, String tableName) {
- return conf != null && tableName != null && Operation.UPDATE.name().equals(
- conf.get(InputFormatConfig.OPERATION_TYPE_PREFIX + tableName));
+ String operation = conf.get(InputFormatConfig.OPERATION_TYPE_PREFIX + tableName);
+ return operation == null ? null : Operation.valueOf(operation);
}
/**
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveFileWriterFactory.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
similarity index 95%
rename from iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveFileWriterFactory.java
rename to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
index 3e1af37347..3459ea5b1e 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveFileWriterFactory.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr.hive.writer;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
@@ -32,9 +32,9 @@ import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
-public class HiveFileWriterFactory extends BaseFileWriterFactory<Record> {
+class HiveFileWriterFactory extends BaseFileWriterFactory<Record> {
- protected HiveFileWriterFactory(
+ HiveFileWriterFactory(
Table table,
FileFormat dataFileFormat,
Schema dataSchema,
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergBufferedDeleteWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergBufferedDeleteWriter.java
similarity index 88%
rename from iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergBufferedDeleteWriter.java
rename to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergBufferedDeleteWriter.java
index 99d59341ed..2d91053355 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergBufferedDeleteWriter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergBufferedDeleteWriter.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr.hive.writer;
import java.io.IOException;
import java.util.Collection;
@@ -26,7 +26,6 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
@@ -43,6 +42,8 @@ import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.PartitioningWriter;
+import org.apache.iceberg.mr.hive.FilesForCommit;
+import org.apache.iceberg.mr.hive.IcebergAcidUtil;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -58,12 +59,9 @@ import org.slf4j.LoggerFactory;
* we only write out {@link PositionDelete} files where the row data is omitted, so only the filenames and the rowIds
* have to be in the memory.
*/
-public class HiveIcebergBufferedDeleteWriter implements HiveIcebergWriter {
+class HiveIcebergBufferedDeleteWriter implements HiveIcebergWriter {
private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergBufferedDeleteWriter.class);
- private static final String DELETE_FILE_THREAD_POOL_SIZE = "iceberg.delete.file.thread.pool.size";
- private static final int DELETE_FILE_THREAD_POOL_SIZE_DEFAULT = 10;
-
// Storing deleted data in a map Partition -> FileName -> BitMap
private final Map<PartitionKey, Map<String, Roaring64Bitmap>> buffer = Maps.newHashMap();
private final Map<Integer, PartitionSpec> specs;
@@ -73,21 +71,21 @@ public class HiveIcebergBufferedDeleteWriter implements HiveIcebergWriter {
private final OutputFileFactory fileFactory;
private final FileIO io;
private final long targetFileSize;
- private final Configuration configuration;
+ private final int poolSize;
private final Record record;
private final InternalRecordWrapper wrapper;
private FilesForCommit filesForCommit;
- HiveIcebergBufferedDeleteWriter(Schema schema, Map<Integer, PartitionSpec> specs, FileFormat format,
- FileWriterFactory<Record> writerFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
- Configuration configuration) {
+ HiveIcebergBufferedDeleteWriter(Schema schema, Map<Integer, PartitionSpec> specs,
+ FileWriterFactory<Record> writerFactory, OutputFileFactory fileFactory, FileFormat format, FileIO io,
+ long targetFileSize, int poolSize) {
this.specs = specs;
this.format = format;
this.writerFactory = writerFactory;
this.fileFactory = fileFactory;
this.io = io;
this.targetFileSize = targetFileSize;
- this.configuration = configuration;
+ this.poolSize = poolSize;
this.wrapper = new InternalRecordWrapper(schema.asStruct());
this.record = GenericRecord.create(schema);
}
@@ -114,7 +112,8 @@ public class HiveIcebergBufferedDeleteWriter implements HiveIcebergWriter {
Collection<DeleteFile> deleteFiles = new ConcurrentLinkedQueue<>();
if (!abort) {
LOG.info("Delete file flush is started");
- ExecutorService fileExecutor = fileExecutor(configuration, buffer.size());
+ int size = Math.min(buffer.size(), poolSize);
+ ExecutorService fileExecutor = fileExecutor(size);
try {
Tasks.foreach(buffer.keySet())
.retry(3)
@@ -165,13 +164,12 @@ public class HiveIcebergBufferedDeleteWriter implements HiveIcebergWriter {
/**
* Executor service for parallel writing of delete files.
- * @param conf The configuration containing the pool size
+ * @param poolSize The pool size
* @return The generated executor service
*/
- private static ExecutorService fileExecutor(Configuration conf, int maxSize) {
- int size = Math.min(maxSize, conf.getInt(DELETE_FILE_THREAD_POOL_SIZE, DELETE_FILE_THREAD_POOL_SIZE_DEFAULT));
+ private static ExecutorService fileExecutor(int poolSize) {
return Executors.newFixedThreadPool(
- size,
+ poolSize,
new ThreadFactoryBuilder()
.setDaemon(true)
.setPriority(Thread.NORM_PRIORITY)
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergDeleteWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java
similarity index 85%
rename from iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergDeleteWriter.java
rename to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java
index a31d29249c..368bfe1179 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergDeleteWriter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergDeleteWriter.java
@@ -17,13 +17,12 @@
* under the License.
*/
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr.hive.writer;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
@@ -36,17 +35,19 @@ import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.hive.FilesForCommit;
+import org.apache.iceberg.mr.hive.IcebergAcidUtil;
import org.apache.iceberg.mr.mapred.Container;
-public class HiveIcebergDeleteWriter extends HiveIcebergWriterBase {
+class HiveIcebergDeleteWriter extends HiveIcebergWriterBase {
private final GenericRecord rowDataTemplate;
- HiveIcebergDeleteWriter(Schema schema, Map<Integer, PartitionSpec> specs, FileFormat fileFormat,
- FileWriterFactory<Record> writerFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
- TaskAttemptID taskAttemptID, String tableName) {
- super(schema, specs, io, taskAttemptID, tableName,
- new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSize), false);
+ HiveIcebergDeleteWriter(Schema schema, Map<Integer, PartitionSpec> specs,
+ FileWriterFactory<Record> writerFactory, OutputFileFactory fileFactory, FileFormat fileFormat, FileIO io,
+ long targetFileSize) {
+ super(schema, specs, io,
+ new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSize));
rowDataTemplate = GenericRecord.create(schema);
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
similarity index 82%
rename from iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
rename to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
index 476b538893..bde57803be 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
@@ -17,13 +17,12 @@
* under the License.
*/
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr.hive.writer;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
@@ -34,18 +33,18 @@ import org.apache.iceberg.io.DataWriteResult;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.hive.FilesForCommit;
import org.apache.iceberg.mr.mapred.Container;
class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
private final int currentSpecId;
- HiveIcebergRecordWriter(
- Schema schema, Map<Integer, PartitionSpec> specs, int currentSpecId, FileFormat format,
- FileWriterFactory<Record> fileWriterFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
- TaskAttemptID taskAttemptID, String tableName, boolean wrapped) {
- super(schema, specs, io, taskAttemptID, tableName,
- new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, format, targetFileSize), wrapped);
+ HiveIcebergRecordWriter(Schema schema, Map<Integer, PartitionSpec> specs, int currentSpecId,
+ FileWriterFactory<Record> fileWriterFactory, OutputFileFactory fileFactory, FileFormat format, FileIO io,
+ long targetFileSize) {
+ super(schema, specs, io,
+ new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, format, targetFileSize));
this.currentSpecId = currentSpecId;
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergUpdateWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergUpdateWriter.java
similarity index 77%
rename from iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergUpdateWriter.java
rename to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergUpdateWriter.java
index f234e6b037..4f4154854e 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergUpdateWriter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergUpdateWriter.java
@@ -17,15 +17,13 @@
* under the License.
*/
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr.hive.writer;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
@@ -34,11 +32,13 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.ClusteredDataWriter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.hive.FilesForCommit;
+import org.apache.iceberg.mr.hive.IcebergAcidUtil;
import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
/**
* Hive update queries are converted to an insert statement where the result contains the updated rows.
@@ -46,22 +46,19 @@ import org.apache.iceberg.mr.mapred.Container;
* The rows are sorted based on the requirements of the {@link HiveIcebergRecordWriter}.
* The {@link HiveIcebergBufferedDeleteWriter} needs to handle out of order records.
*/
-class HiveIcebergUpdateWriter extends HiveIcebergWriterBase {
+class HiveIcebergUpdateWriter implements HiveIcebergWriter {
private final HiveIcebergBufferedDeleteWriter deleteWriter;
private final HiveIcebergRecordWriter insertWriter;
private final Container<Record> container;
- HiveIcebergUpdateWriter(
- Schema schema, Map<Integer, PartitionSpec> specs, int currentSpecId, FileFormat format,
- FileWriterFactory<Record> fileWriterFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
- TaskAttemptID taskAttemptID, String tableName, Configuration configuration) {
- super(schema, specs, io, taskAttemptID, tableName,
- new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, format, targetFileSize), false);
- this.deleteWriter = new HiveIcebergBufferedDeleteWriter(schema, specs, format, fileWriterFactory, fileFactory, io,
- targetFileSize, configuration);
- this.insertWriter = new HiveIcebergRecordWriter(schema, specs, currentSpecId, format, fileWriterFactory,
- fileFactory, io, targetFileSize, taskAttemptID, tableName, true);
+ HiveIcebergUpdateWriter(Schema schema, Map<Integer, PartitionSpec> specs, int currentSpecId,
+ FileWriterFactory<Record> fileWriterFactory, OutputFileFactory fileFactory, OutputFileFactory deleteFileFactory,
+ FileFormat format, FileFormat deleteFormat, FileIO io, long targetFileSize, int poolSize) {
+ this.deleteWriter = new HiveIcebergBufferedDeleteWriter(schema, specs, fileWriterFactory, deleteFileFactory,
+ deleteFormat, io, targetFileSize, poolSize);
+ this.insertWriter = new HiveIcebergRecordWriter(schema, specs, currentSpecId, fileWriterFactory, fileFactory,
+ format, io, targetFileSize);
this.container = new Container<>();
Record record = GenericRecord.create(schema);
container.set(record);
@@ -86,4 +83,9 @@ class HiveIcebergUpdateWriter extends HiveIcebergWriterBase {
Collection<DeleteFile> deleteFiles = deleteWriter.files().deleteFiles();
return new FilesForCommit(dataFiles, deleteFiles);
}
+
+ @VisibleForTesting
+ HiveIcebergWriter deleteWriter() {
+ return deleteWriter;
+ }
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriter.java
similarity index 79%
rename from iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java
rename to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriter.java
index 1ea127e023..bf2e17a821 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriter.java
@@ -17,15 +17,19 @@
* under the License.
*/
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr.hive.writer;
import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.hive.FilesForCommit;
import org.apache.iceberg.mr.mapred.Container;
-public interface HiveIcebergWriter {
+public interface HiveIcebergWriter extends FileSinkOperator.RecordWriter,
+ org.apache.hadoop.mapred.RecordWriter<NullWritable, Container<Record>> {
FilesForCommit files();
void close(boolean abort) throws IOException;
void write(Writable row) throws IOException;
@@ -37,5 +41,4 @@ public interface HiveIcebergWriter {
default void write(NullWritable key, Container value) throws IOException {
write(value);
}
-
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriterBase.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java
similarity index 67%
rename from iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriterBase.java
rename to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java
index cd0393f600..f286a0fa90 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriterBase.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java
@@ -17,14 +17,10 @@
* under the License.
*/
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr.hive.writer;
import java.io.IOException;
import java.util.Map;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -32,54 +28,29 @@ import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.PartitioningWriter;
-import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.mr.hive.FilesForCommit;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("checkstyle:VisibilityModifier")
-public abstract class HiveIcebergWriterBase implements FileSinkOperator.RecordWriter,
- org.apache.hadoop.mapred.RecordWriter<NullWritable, Container<Record>>, HiveIcebergWriter {
+abstract class HiveIcebergWriterBase implements HiveIcebergWriter {
private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergWriterBase.class);
- private static final Map<TaskAttemptID, Map<String, HiveIcebergWriter>> writers = Maps.newConcurrentMap();
-
- static Map<String, HiveIcebergWriter> removeWriters(TaskAttemptID taskAttemptID) {
- return writers.remove(taskAttemptID);
- }
-
- static Map<String, HiveIcebergWriter> getWriters(TaskAttemptID taskAttemptID) {
- return writers.get(taskAttemptID);
- }
-
protected final FileIO io;
protected final InternalRecordWrapper wrapper;
protected final Map<Integer, PartitionSpec> specs;
protected final Map<Integer, PartitionKey> partitionKeys;
protected final PartitioningWriter writer;
- protected HiveIcebergWriterBase(Schema schema, Map<Integer, PartitionSpec> specs, FileIO io, TaskAttemptID attemptID,
- String tableName, PartitioningWriter writer, boolean wrapped) {
+ HiveIcebergWriterBase(Schema schema, Map<Integer, PartitionSpec> specs, FileIO io,
+ PartitioningWriter writer) {
this.io = io;
this.wrapper = new InternalRecordWrapper(schema.asStruct());
this.specs = specs;
this.partitionKeys = Maps.newHashMapWithExpectedSize(specs.size());
this.writer = writer;
- if (!wrapped) {
- writers.putIfAbsent(attemptID, Maps.newConcurrentMap());
- writers.get(attemptID).put(tableName, this);
- }
- }
-
- @Override
- public void write(NullWritable key, Container value) throws IOException {
- write(value);
- }
-
- @Override
- public void close(Reporter reporter) throws IOException {
- close(false);
}
@Override
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java
new file mode 100644
index 0000000000..b4b26738c4
--- /dev/null
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.writer;
+
+import java.util.Locale;
+import java.util.Map;
+import org.apache.hadoop.hive.ql.Context.Operation;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.util.PropertyUtil;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
+
+public class WriterBuilder {
+ private final Table table;
+ private String tableName;
+ private TaskAttemptID attemptID;
+ private String queryId;
+ private int poolSize;
+ private Operation operation;
+
+ private WriterBuilder(Table table) {
+ this.table = table;
+ }
+
+ public static WriterBuilder builderFor(Table table) {
+ return new WriterBuilder(table);
+ }
+
+ public WriterBuilder tableName(String newTableName) {
+ this.tableName = newTableName;
+ return this;
+ }
+
+ public WriterBuilder attemptID(TaskAttemptID newAttemptID) {
+ this.attemptID = newAttemptID;
+ return this;
+ }
+
+ public WriterBuilder queryId(String newQueryId) {
+ this.queryId = newQueryId;
+ return this;
+ }
+
+ public WriterBuilder poolSize(int newPoolSize) {
+ this.poolSize = newPoolSize;
+ return this;
+ }
+
+ public WriterBuilder operation(Operation newOperation) {
+ this.operation = newOperation;
+ return this;
+ }
+
+ public HiveIcebergWriter build() {
+ Map<String, String> properties = table.properties();
+
+ String dataFileFormatName = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
+ FileFormat dataFileFormat = FileFormat.valueOf(dataFileFormatName.toUpperCase(Locale.ENGLISH));
+
+ String deleteFileFormatName = properties.getOrDefault(DELETE_DEFAULT_FILE_FORMAT, dataFileFormatName);
+ FileFormat deleteFileFormat = FileFormat.valueOf(deleteFileFormatName.toUpperCase(Locale.ENGLISH));
+
+ long targetFileSize = PropertyUtil.propertyAsLong(table.properties(), TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+ TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+
+ Schema dataSchema = table.schema();
+ FileIO io = table.io();
+ Map<Integer, PartitionSpec> specs = table.specs();
+ int currentSpecId = table.spec().specId();
+ int partitionId = attemptID.getTaskID().getId();
+ int taskId = attemptID.getId();
+ String operationId = queryId + "-" + attemptID.getJobID();
+ OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId)
+ .format(dataFileFormat)
+ .operationId("data-" + operationId)
+ .build();
+
+ OutputFileFactory deleteOutputFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId)
+ .format(deleteFileFormat)
+ .operationId("delete-" + operationId)
+ .build();
+
+ Schema positionDeleteRowSchema = operation == Operation.UPDATE ? null : dataSchema;
+ HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, dataFileFormat, dataSchema, null,
+ deleteFileFormat, null, null, null,
+ positionDeleteRowSchema);
+
+ HiveIcebergWriter writer;
+ switch (operation) {
+ case UPDATE:
+ writer = new HiveIcebergUpdateWriter(dataSchema, specs, currentSpecId, writerFactory, outputFileFactory,
+ deleteOutputFileFactory, dataFileFormat, deleteFileFormat, io, targetFileSize, poolSize);
+ break;
+ case DELETE:
+ writer = new HiveIcebergDeleteWriter(dataSchema, specs, writerFactory, deleteOutputFileFactory,
+ deleteFileFormat, io, targetFileSize);
+ break;
+ default:
+ writer = new HiveIcebergRecordWriter(dataSchema, specs, currentSpecId, writerFactory, outputFileFactory,
+ dataFileFormat, io, targetFileSize);
+ break;
+ }
+
+ WriterRegistry.registerWriter(attemptID, tableName, writer);
+ return writer;
+ }
+}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterRegistry.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterRegistry.java
new file mode 100644
index 0000000000..dba4973e41
--- /dev/null
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterRegistry.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.writer;
+
+import java.util.Map;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public class WriterRegistry {
+ private static final Map<TaskAttemptID, Map<String, HiveIcebergWriter>> writers = Maps.newConcurrentMap();
+
+ private WriterRegistry() {
+ }
+
+ public static Map<String, HiveIcebergWriter> removeWriters(TaskAttemptID taskAttemptID) {
+ return writers.remove(taskAttemptID);
+ }
+
+ public static void registerWriter(TaskAttemptID taskAttemptID, String tableName, HiveIcebergWriter writer) {
+ writers.putIfAbsent(taskAttemptID, Maps.newConcurrentMap());
+ writers.get(taskAttemptID).put(tableName, writer);
+ }
+
+ public static Map<String, HiveIcebergWriter> writers(TaskAttemptID taskAttemptID) {
+ return writers.get(taskAttemptID);
+ }
+}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 82636ff95a..0ccb750fda 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.llap.LlapHiveUtils;
+import org.apache.hadoop.hive.ql.Context.Operation;
import org.apache.hadoop.hive.ql.io.PositionDeleteInfo;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.mapred.JobConf;
@@ -240,6 +241,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
private T current;
private CloseableIterator<T> currentIterator;
private Table table;
+ private boolean updateOrDelete;
@Override
public void initialize(InputSplit split, TaskAttemptContext newContext) {
@@ -257,6 +259,8 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
this.inMemoryDataModel = conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
InputFormatConfig.InMemoryDataModel.GENERIC);
this.currentIterator = open(tasks.next(), expectedSchema).iterator();
+ Operation operation = HiveIcebergStorageHandler.operation(conf, conf.get(Catalogs.NAME));
+ this.updateOrDelete = Operation.DELETE.equals(operation) || Operation.UPDATE.equals(operation);
}
@Override
@@ -264,8 +268,7 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
while (true) {
if (currentIterator.hasNext()) {
current = currentIterator.next();
- if (HiveIcebergStorageHandler.isDelete(conf, conf.get(Catalogs.NAME)) ||
- HiveIcebergStorageHandler.isUpdate(conf, conf.get(Catalogs.NAME))) {
+ if (updateOrDelete) {
GenericRecord rec = (GenericRecord) current;
PositionDeleteInfo.setIntoConf(conf,
IcebergAcidUtil.parseSpecId(rec),
@@ -508,17 +511,24 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> {
readSchema = caseSensitive ? table.schema().select(selectedColumns) :
table.schema().caseInsensitiveSelect(selectedColumns);
- // for DELETE queries, add additional metadata columns into the read schema
- if (HiveIcebergStorageHandler.isDelete(conf, conf.get(Catalogs.NAME))) {
- readSchema = IcebergAcidUtil.createFileReadSchemaForDelete(readSchema.columns(), table);
- }
-
- // for UPDATE queries, add additional metadata columns into the read schema
- if (HiveIcebergStorageHandler.isUpdate(conf, conf.get(Catalogs.NAME))) {
- readSchema = IcebergAcidUtil.createFileReadSchemaForUpdate(readSchema.columns(), table);
+ Operation operation = HiveIcebergStorageHandler.operation(conf, conf.get(Catalogs.NAME));
+ if (operation != null) {
+ switch (operation) {
+ case DELETE:
+ // for DELETE queries, add additional metadata columns into the read schema
+ return IcebergAcidUtil.createFileReadSchemaForDelete(readSchema.columns(), table);
+ case UPDATE:
+ // for UPDATE queries, add additional metadata columns into the read schema
+ return IcebergAcidUtil.createFileReadSchemaForUpdate(readSchema.columns(), table);
+ case OTHER:
+ // for INSERT queries no extra columns are needed
+ return readSchema;
+ default:
+ throw new IllegalArgumentException("Not supported operation " + operation);
+ }
+ } else {
+ return readSchema;
}
-
- return readSchema;
}
private static Schema schemaWithoutConstantsAndMeta(Schema readSchema, Map<Integer, ?> idToConstant) {
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
index 5857d858f5..d0eb3ebc8f 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
@@ -35,17 +35,17 @@ import org.apache.hadoop.mapred.TaskAttemptContextImpl;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.TestHelper;
+import org.apache.iceberg.mr.hive.writer.HiveIcebergWriter;
+import org.apache.iceberg.mr.hive.writer.WriterBuilder;
+import org.apache.iceberg.mr.hive.writer.WriterRegistry;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -59,11 +59,9 @@ import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
-import static org.apache.iceberg.mr.hive.HiveIcebergWriterBase.getWriters;
import static org.apache.iceberg.types.Types.NestedField.required;
public class TestHiveIcebergOutputCommitter {
- private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024;
private static final int RECORD_NUM = 5;
private static final String QUERY_ID = "query_id";
private static final JobID JOB_ID = new JobID("test", 0);
@@ -209,10 +207,10 @@ public class TestHiveIcebergOutputCommitter {
Assert.assertEquals(1, argumentCaptor.getAllValues().size());
TaskAttemptID capturedId = TezUtil.taskAttemptWrapper(argumentCaptor.getValue().getTaskAttemptID());
// writer is still in the map after commitTask failure
- Assert.assertNotNull(getWriters(capturedId));
+ Assert.assertNotNull(WriterRegistry.writers(capturedId));
failingCommitter.abortTask(new TaskAttemptContextImpl(conf, capturedId));
// abortTask succeeds and removes writer
- Assert.assertNull(getWriters(capturedId));
+ Assert.assertNull(WriterRegistry.writers(capturedId));
}
private Table table(String location, boolean partitioned) {
@@ -247,7 +245,7 @@ public class TestHiveIcebergOutputCommitter {
/**
* Write random records to the given table using separate {@link HiveIcebergOutputCommitter} and
- * a separate {@link HiveIcebergRecordWriter} for every task.
+ * a separate {@link HiveIcebergWriter} for every task.
* @param name The name of the table to get the table object from the conf
* @param taskNum The number of tasks in the job handled by the committer
* @param attemptNum The id used for attempt number generation
@@ -257,14 +255,13 @@ public class TestHiveIcebergOutputCommitter {
* @param conf The job configuration
* @param committer The output committer that should be used for committing/aborting the tasks
* @return The random generated records which were appended to the table
- * @throws IOException Propagating {@link HiveIcebergRecordWriter} exceptions
+ * @throws IOException Propagating {@link HiveIcebergWriter} exceptions
*/
private List<Record> writeRecords(String name, int taskNum, int attemptNum, boolean commitTasks, boolean abortTasks,
JobConf conf, OutputCommitter committer) throws IOException {
List<Record> expected = Lists.newArrayListWithExpectedSize(RECORD_NUM * taskNum);
Table table = HiveIcebergStorageHandler.table(conf, name);
- FileIO io = table.io();
Schema schema = HiveIcebergStorageHandler.schema(conf);
for (int i = 0; i < taskNum; ++i) {
@@ -273,23 +270,15 @@ public class TestHiveIcebergOutputCommitter {
for (int j = 0; j < RECORD_NUM; ++j) {
records.get(j).setField("customer_id", j / 3L);
}
+
TaskAttemptID taskId = new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, i, attemptNum);
- int partitionId = taskId.getTaskID().getId();
- String operationId = QUERY_ID + "-" + JOB_ID;
- FileFormat fileFormat = FileFormat.PARQUET;
- OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, partitionId, attemptNum)
- .format(fileFormat)
- .operationId(operationId)
+ HiveIcebergWriter testWriter = WriterBuilder.builderFor(table)
+ .attemptID(TezUtil.taskAttemptWrapper(taskId))
+ .queryId("Q_ID")
+ .tableName(conf.get(Catalogs.NAME))
+ .operation(Context.Operation.OTHER)
.build();
- HiveFileWriterFactory hfwf = new HiveFileWriterFactory(table, fileFormat, schema,
- null, fileFormat, null, null, null, null);
-
-
- HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema, table.specs(),
- table.spec().specId(), fileFormat, hfwf, outputFileFactory, io, TARGET_FILE_SIZE,
- TezUtil.taskAttemptWrapper(taskId), conf.get(Catalogs.NAME), false);
-
Container<Record> container = new Container<>();
for (Record record : records) {
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergWriterTestBase.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterTestBase.java
similarity index 91%
rename from iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergWriterTestBase.java
rename to iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterTestBase.java
index 1199391304..1a818a131a 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergWriterTestBase.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterTestBase.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr.hive.writer;
import java.io.File;
import java.io.IOException;
@@ -25,6 +25,9 @@ import java.util.Collection;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
@@ -69,6 +72,7 @@ public class HiveIcebergWriterTestBase {
private final HadoopTables tables = new HadoopTables(new HiveConf());
private TestHelper helper;
protected Table table;
+ protected WriterBuilder writerBuilder;
@Rule
public TemporaryFolder temp = new TemporaryFolder();
@@ -108,6 +112,14 @@ public class HiveIcebergWriterTestBase {
TableOperations ops = ((BaseTable) table).operations();
TableMetadata meta = ops.current();
ops.commit(meta, meta.upgradeToFormatVersion(2));
+
+ JobID jobId = new JobID("test", 0);
+ TaskAttemptID taskAttemptID =
+ new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), TaskType.MAP, 0, 0);
+ writerBuilder = WriterBuilder.builderFor(table)
+ .attemptID(taskAttemptID)
+ .queryId("Q_ID")
+ .tableName("dummy");
}
@After
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergDeleteWriter.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergDeleteWriter.java
similarity index 78%
rename from iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergDeleteWriter.java
rename to iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergDeleteWriter.java
index 6b6e9c4b1e..9cac3a0262 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergDeleteWriter.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergDeleteWriter.java
@@ -17,23 +17,21 @@
* under the License.
*/
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr.hive.writer;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.hive.IcebergAcidUtil;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -45,13 +43,11 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TestHiveIcebergDeleteWriter extends HiveIcebergWriterTestBase {
- private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024;
- private static final JobID JOB_ID = new JobID("test", 0);
private static final Set<Integer> DELETED_IDS = Sets.newHashSet(29, 61, 89, 100, 122);
@Test
public void testDelete() throws IOException {
- HiveIcebergDeleteWriter testWriter = deleteWriter();
+ HiveIcebergWriter testWriter = deleteWriter();
List<GenericRecord> deleteRecords = deleteRecords(table, DELETED_IDS);
@@ -99,18 +95,7 @@ public class TestHiveIcebergDeleteWriter extends HiveIcebergWriterTestBase {
return deleteRecords;
}
- private HiveIcebergDeleteWriter deleteWriter() {
- OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, 1, 2)
- .format(fileFormat)
- .operationId("3")
- .build();
-
- HiveFileWriterFactory hfwf = new HiveFileWriterFactory(table, fileFormat, SCHEMA, null, fileFormat, null, null,
- null, null);
-
- TaskAttemptID taskId = new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, 1, 0);
-
- return new HiveIcebergDeleteWriter(table.schema(), table.specs(), fileFormat, hfwf, outputFileFactory, table.io(),
- TARGET_FILE_SIZE, taskId, "partitioned");
+ private HiveIcebergWriter deleteWriter() {
+ return writerBuilder.operation(Context.Operation.DELETE).build();
}
}
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergUpdateWriter.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergUpdateWriter.java
similarity index 77%
rename from iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergUpdateWriter.java
rename to iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergUpdateWriter.java
index 37df7803ab..1ecf7f85f6 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergUpdateWriter.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergUpdateWriter.java
@@ -17,24 +17,21 @@
* under the License.
*/
-package org.apache.iceberg.mr.hive;
+package org.apache.iceberg.mr.hive.writer;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.hive.IcebergAcidUtil;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -46,10 +43,6 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TestHiveIcebergUpdateWriter extends HiveIcebergWriterTestBase {
- private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024;
- private static final JobID JOB_ID = new JobID("test", 0);
- private static final TaskAttemptID TASK_ATTEMPT_ID =
- new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, 0, 0);
private static final Map<Integer, GenericRecord> UPDATED_RECORDS = ImmutableMap.of(
29, record(29, "d"),
61, record(61, "h"),
@@ -58,15 +51,15 @@ public class TestHiveIcebergUpdateWriter extends HiveIcebergWriterTestBase {
122, record(142, "k"));
/**
- * This test just runs sends the data through the DeleteWriter. Here we make sure that the correct rows are removed.
+ * This test just runs sends the data through the deleteWriter. Here we make sure that the correct rows are removed.
* @throws IOException If here is an error
*/
@Test
public void testDelete() throws IOException {
- HiveIcebergWriter testWriter = new HiveIcebergBufferedDeleteWriter(table.schema(), table.specs(), fileFormat,
- hiveFileWriterFactory(), outputFileFactory(), table.io(), TARGET_FILE_SIZE, new HiveConf());
+ HiveIcebergWriter updateWriter = writerBuilder.poolSize(10).operation(Context.Operation.UPDATE).build();
+ HiveIcebergWriter deleteWriter = ((HiveIcebergUpdateWriter) updateWriter).deleteWriter();
- update(table, testWriter);
+ update(table, deleteWriter);
StructLikeSet expected = rowSetWithoutIds(RECORDS, UPDATED_RECORDS.keySet());
StructLikeSet actual = actualRowSet(table);
@@ -80,9 +73,7 @@ public class TestHiveIcebergUpdateWriter extends HiveIcebergWriterTestBase {
*/
@Test
public void testUpdate() throws IOException {
- HiveIcebergWriter testWriter = new HiveIcebergUpdateWriter(table.schema(), table.specs(), table.spec().specId(),
- fileFormat, hiveFileWriterFactory(), outputFileFactory(), table.io(), TARGET_FILE_SIZE, TASK_ATTEMPT_ID,
- "table_name", new HiveConf());
+ HiveIcebergWriter testWriter = writerBuilder.poolSize(10).operation(Context.Operation.UPDATE).build();
update(table, testWriter);
@@ -93,18 +84,6 @@ public class TestHiveIcebergUpdateWriter extends HiveIcebergWriterTestBase {
Assert.assertEquals("Table should contain expected rows", expected, actual);
}
- private OutputFileFactory outputFileFactory() {
- return OutputFileFactory.builderFor(table, 1, 2)
- .format(fileFormat)
- .operationId("3")
- .build();
- }
-
- private HiveFileWriterFactory hiveFileWriterFactory() {
- return new HiveFileWriterFactory(table, fileFormat, SCHEMA, null, fileFormat, null, null,
- null, null);
- }
-
private static void update(Table table, HiveIcebergWriter testWriter) throws IOException {
List<GenericRecord> updateRecords = updateRecords(table, UPDATED_RECORDS);