You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2022/05/04 08:50:15 UTC
[hive] branch master updated: HIVE-26183: Create delete writer for the UPDATE statements (Peter Vary reviewed by Adam Szita and Marton Bod) (#3251)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 9067431bee HIVE-26183: Create delete writer for the UPDATE statements (Peter Vary reviewed by Adam Szita and Marton Bod) (#3251)
9067431bee is described below
commit 9067431bee431c3f51124e30c6551ed04b2e3d22
Author: pvary <pv...@cloudera.com>
AuthorDate: Wed May 4 10:50:04 2022 +0200
HIVE-26183: Create delete writer for the UPDATE statements (Peter Vary reviewed by Adam Szita and Marton Bod) (#3251)
---
iceberg/iceberg-handler/pom.xml | 7 +-
.../org/apache/iceberg/mr/hive/FilesForCommit.java | 18 +-
.../mr/hive/HiveIcebergBufferedDeleteWriter.java | 181 +++++++++++++++++++++
.../iceberg/mr/hive/HiveIcebergDeleteWriter.java | 4 +-
.../mr/hive/HiveIcebergOutputCommitter.java | 6 +-
.../iceberg/mr/hive/HiveIcebergOutputFormat.java | 4 +-
.../iceberg/mr/hive/HiveIcebergRecordWriter.java | 6 +-
.../iceberg/mr/hive/HiveIcebergUpdateWriter.java | 89 ++++++++++
.../apache/iceberg/mr/hive/HiveIcebergWriter.java | 84 +---------
...ebergWriter.java => HiveIcebergWriterBase.java} | 18 +-
.../apache/iceberg/mr/hive/IcebergAcidUtil.java | 108 +++++++++---
.../org/apache/iceberg/data/IcebergGenerics2.java | 103 ++++++++++++
.../java/org/apache/iceberg/mr/TestHelper.java | 27 +++
.../iceberg/mr/hive/HiveIcebergWriterTestBase.java | 151 +++++++++++++++++
.../mr/hive/TestHiveIcebergDeleteWriter.java | 116 +++++++++++++
.../mr/hive/TestHiveIcebergOutputCommitter.java | 4 +-
.../mr/hive/TestHiveIcebergUpdateWriter.java | 159 ++++++++++++++++++
iceberg/pom.xml | 6 +
18 files changed, 963 insertions(+), 128 deletions(-)
diff --git a/iceberg/iceberg-handler/pom.xml b/iceberg/iceberg-handler/pom.xml
index 6a37fdbf16..20ae1e6ec9 100644
--- a/iceberg/iceberg-handler/pom.xml
+++ b/iceberg/iceberg-handler/pom.xml
@@ -100,11 +100,16 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-core</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.9.22</version>
- <scope>test</scope>
</dependency>
</dependencies>
<build>
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
index 0dd490628c..237ef55369 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
@@ -20,8 +20,8 @@
package org.apache.iceberg.mr.hive;
import java.io.Serializable;
+import java.util.Collection;
import java.util.Collections;
-import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.ContentFile;
@@ -30,19 +30,19 @@ import org.apache.iceberg.DeleteFile;
public class FilesForCommit implements Serializable {
- private final List<DataFile> dataFiles;
- private final List<DeleteFile> deleteFiles;
+ private final Collection<DataFile> dataFiles;
+ private final Collection<DeleteFile> deleteFiles;
- public FilesForCommit(List<DataFile> dataFiles, List<DeleteFile> deleteFiles) {
+ public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> deleteFiles) {
this.dataFiles = dataFiles;
this.deleteFiles = deleteFiles;
}
- public static FilesForCommit onlyDelete(List<DeleteFile> deleteFiles) {
+ public static FilesForCommit onlyDelete(Collection<DeleteFile> deleteFiles) {
return new FilesForCommit(Collections.emptyList(), deleteFiles);
}
- public static FilesForCommit onlyData(List<DataFile> dataFiles) {
+ public static FilesForCommit onlyData(Collection<DataFile> dataFiles) {
return new FilesForCommit(dataFiles, Collections.emptyList());
}
@@ -50,15 +50,15 @@ public class FilesForCommit implements Serializable {
return new FilesForCommit(Collections.emptyList(), Collections.emptyList());
}
- public List<DataFile> dataFiles() {
+ public Collection<DataFile> dataFiles() {
return dataFiles;
}
- public List<DeleteFile> deleteFiles() {
+ public Collection<DeleteFile> deleteFiles() {
return deleteFiles;
}
- public List<? extends ContentFile> allFiles() {
+ public Collection<? extends ContentFile> allFiles() {
return Stream.concat(dataFiles.stream(), deleteFiles.stream()).collect(Collectors.toList());
}
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergBufferedDeleteWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergBufferedDeleteWriter.java
new file mode 100644
index 0000000000..99d59341ed
--- /dev/null
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergBufferedDeleteWriter.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.InternalRecordWrapper;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
+import org.apache.iceberg.io.DeleteWriteResult;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileWriterFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitioningWriter;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.roaringbitmap.longlong.PeekableLongIterator;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link HiveIcebergBufferedDeleteWriter} needs to handle out of order records.
+ * We need to keep the incoming records in memory until they are written out. To keep the memory consumption minimal
+ * we only write out {@link PositionDelete} files where the row data is omitted, so only the filenames and the rowIds
+ * have to be in the memory.
+ */
+public class HiveIcebergBufferedDeleteWriter implements HiveIcebergWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergBufferedDeleteWriter.class);
+
+ private static final String DELETE_FILE_THREAD_POOL_SIZE = "iceberg.delete.file.thread.pool.size";
+ private static final int DELETE_FILE_THREAD_POOL_SIZE_DEFAULT = 10;
+
+ // Storing deleted data in a map Partition -> FileName -> BitMap
+ private final Map<PartitionKey, Map<String, Roaring64Bitmap>> buffer = Maps.newHashMap();
+ private final Map<Integer, PartitionSpec> specs;
+ private final Map<PartitionKey, PartitionSpec> keyToSpec = Maps.newHashMap();
+ private final FileFormat format;
+ private final FileWriterFactory<Record> writerFactory;
+ private final OutputFileFactory fileFactory;
+ private final FileIO io;
+ private final long targetFileSize;
+ private final Configuration configuration;
+ private final Record record;
+ private final InternalRecordWrapper wrapper;
+ private FilesForCommit filesForCommit;
+
+ HiveIcebergBufferedDeleteWriter(Schema schema, Map<Integer, PartitionSpec> specs, FileFormat format,
+ FileWriterFactory<Record> writerFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
+ Configuration configuration) {
+ this.specs = specs;
+ this.format = format;
+ this.writerFactory = writerFactory;
+ this.fileFactory = fileFactory;
+ this.io = io;
+ this.targetFileSize = targetFileSize;
+ this.configuration = configuration;
+ this.wrapper = new InternalRecordWrapper(schema.asStruct());
+ this.record = GenericRecord.create(schema);
+ }
+
+ @Override
+ public void write(Writable row) throws IOException {
+ Record rec = ((Container<Record>) row).get();
+ IcebergAcidUtil.populateWithOriginalValues(rec, record);
+ String filePath = IcebergAcidUtil.parseFilePath(rec);
+ int specId = IcebergAcidUtil.parseSpecId(rec);
+
+ Map<String, Roaring64Bitmap> deleteMap =
+ buffer.computeIfAbsent(partition(record, specId), key -> {
+ keyToSpec.put(key, specs.get(specId));
+ return Maps.newHashMap();
+ });
+ Roaring64Bitmap deletes = deleteMap.computeIfAbsent(filePath, unused -> new Roaring64Bitmap());
+ deletes.add(IcebergAcidUtil.parseFilePosition(rec));
+ }
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ long startTime = System.currentTimeMillis();
+ Collection<DeleteFile> deleteFiles = new ConcurrentLinkedQueue<>();
+ if (!abort) {
+ LOG.info("Delete file flush is started");
+ ExecutorService fileExecutor = fileExecutor(configuration, buffer.size());
+ try {
+ Tasks.foreach(buffer.keySet())
+ .retry(3)
+ .executeWith(fileExecutor)
+ .onFailure((partition, exception) -> LOG.info("Failed to write delete file {}", partition, exception))
+ .run(partition -> {
+ PositionDelete<Record> positionDelete = PositionDelete.create();
+ PartitioningWriter writerForFiles;
+ try (PartitioningWriter writer =
+ new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, io, format, targetFileSize)) {
+ Map<String, Roaring64Bitmap> deleteRows = buffer.get(partition);
+ for (String filePath : new TreeSet<>(deleteRows.keySet())) {
+ Roaring64Bitmap deletes = deleteRows.get(filePath);
+ PeekableLongIterator longIterator = deletes.getLongIterator();
+ while (longIterator.hasNext()) {
+ long position = longIterator.next();
+ positionDelete.set(filePath, position, null);
+ writer.write(positionDelete, keyToSpec.get(partition), partition);
+ }
+ }
+ // We need the writer object later to get the generated data files
+ writerForFiles = writer;
+ }
+ deleteFiles.addAll(((DeleteWriteResult) writerForFiles.result()).deleteFiles());
+ }, IOException.class);
+ } finally {
+ fileExecutor.shutdown();
+ }
+ }
+
+ LOG.info("HiveIcebergBufferedDeleteWriter is closed with abort={}. Created {} delete files and it took {} ns.",
+ abort, deleteFiles.size(), System.currentTimeMillis() - startTime);
+ LOG.debug("Delete files written {}", deleteFiles);
+
+ this.filesForCommit = FilesForCommit.onlyDelete(deleteFiles);
+ }
+
+ @Override
+ public FilesForCommit files() {
+ return filesForCommit;
+ }
+
+ protected PartitionKey partition(Record row, int specId) {
+ PartitionKey partitionKey = new PartitionKey(specs.get(specId), specs.get(specId).schema());
+ partitionKey.partition(wrapper.wrap(row));
+ return partitionKey;
+ }
+
+ /**
+ * Executor service for parallel writing of delete files.
+ * @param conf The configuration containing the pool size
+ * @return The generated executor service
+ */
+ private static ExecutorService fileExecutor(Configuration conf, int maxSize) {
+ int size = Math.min(maxSize, conf.getInt(DELETE_FILE_THREAD_POOL_SIZE, DELETE_FILE_THREAD_POOL_SIZE_DEFAULT));
+ return Executors.newFixedThreadPool(
+ size,
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setPriority(Thread.NORM_PRIORITY)
+ .setNameFormat("iceberg-delete-file-pool-%d")
+ .build());
+ }
+}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergDeleteWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergDeleteWriter.java
index a03a1ee1dd..a31d29249c 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergDeleteWriter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergDeleteWriter.java
@@ -38,7 +38,7 @@ import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.mr.mapred.Container;
-public class HiveIcebergDeleteWriter extends HiveIcebergWriter {
+public class HiveIcebergDeleteWriter extends HiveIcebergWriterBase {
private final GenericRecord rowDataTemplate;
@@ -46,7 +46,7 @@ public class HiveIcebergDeleteWriter extends HiveIcebergWriter {
FileWriterFactory<Record> writerFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
TaskAttemptID taskAttemptID, String tableName) {
super(schema, specs, io, taskAttemptID, tableName,
- new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSize));
+ new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, io, fileFormat, targetFileSize), false);
rowDataTemplate = GenericRecord.create(schema);
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index 92dfa1cf78..00edc3bd2e 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -107,7 +107,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
TaskAttemptID attemptID = context.getTaskAttemptID();
JobConf jobConf = context.getJobConf();
Collection<String> outputs = HiveIcebergStorageHandler.outputTables(context.getJobConf());
- Map<String, HiveIcebergWriter> writers = Optional.ofNullable(HiveIcebergWriter.getWriters(attemptID))
+ Map<String, HiveIcebergWriter> writers = Optional.ofNullable(HiveIcebergWriterBase.getWriters(attemptID))
.orElseGet(() -> {
LOG.info("CommitTask found no writers for output tables: {}, attemptID: {}", outputs, attemptID);
return ImmutableMap.of();
@@ -146,7 +146,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
}
// remove the writer to release the object
- HiveIcebergWriter.removeWriters(attemptID);
+ HiveIcebergWriterBase.removeWriters(attemptID);
}
/**
@@ -159,7 +159,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext);
// Clean up writer data from the local store
- Map<String, HiveIcebergWriter> writers = HiveIcebergWriter.removeWriters(context.getTaskAttemptID());
+ Map<String, HiveIcebergWriter> writers = HiveIcebergWriterBase.removeWriters(context.getTaskAttemptID());
// Remove files if it was not done already
if (writers != null) {
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index 8cb378c470..ea3b29b439 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -63,7 +63,7 @@ public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Co
// Not doing any check.
}
- private static HiveIcebergWriter writer(JobConf jc) {
+ private static HiveIcebergWriterBase writer(JobConf jc) {
TaskAttemptID taskAttemptID = TezUtil.taskAttemptWrapper(jc);
// It gets the config from the FileSinkOperator which has its own config for every target table
Table table = HiveIcebergStorageHandler.table(jc, jc.get(hive_metastoreConstants.META_TABLE_NAME));
@@ -88,7 +88,7 @@ public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Co
targetFileSize, taskAttemptID, tableName);
} else {
return new HiveIcebergRecordWriter(schema, table.specs(), table.spec().specId(), fileFormat, writerFactory,
- outputFileFactory, io, targetFileSize, taskAttemptID, tableName);
+ outputFileFactory, io, targetFileSize, taskAttemptID, tableName, false);
}
}
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
index f12b1fda20..476b538893 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java
@@ -36,16 +36,16 @@ import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.mr.mapred.Container;
-class HiveIcebergRecordWriter extends HiveIcebergWriter {
+class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
private final int currentSpecId;
HiveIcebergRecordWriter(
Schema schema, Map<Integer, PartitionSpec> specs, int currentSpecId, FileFormat format,
FileWriterFactory<Record> fileWriterFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
- TaskAttemptID taskAttemptID, String tableName) {
+ TaskAttemptID taskAttemptID, String tableName, boolean wrapped) {
super(schema, specs, io, taskAttemptID, tableName,
- new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, format, targetFileSize));
+ new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, format, targetFileSize), wrapped);
this.currentSpecId = currentSpecId;
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergUpdateWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergUpdateWriter.java
new file mode 100644
index 0000000000..f234e6b037
--- /dev/null
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergUpdateWriter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.ClusteredDataWriter;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileWriterFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.mapred.Container;
+
+/**
+ * Hive update queries are converted to an insert statement where the result contains the updated rows.
+ * The schema is defined by {@link IcebergAcidUtil#createFileReadSchemaForUpdate(List, Table)}}.
+ * The rows are sorted based on the requirements of the {@link HiveIcebergRecordWriter}.
+ * The {@link HiveIcebergBufferedDeleteWriter} needs to handle out of order records.
+ */
+class HiveIcebergUpdateWriter extends HiveIcebergWriterBase {
+
+ private final HiveIcebergBufferedDeleteWriter deleteWriter;
+ private final HiveIcebergRecordWriter insertWriter;
+ private final Container<Record> container;
+
+ HiveIcebergUpdateWriter(
+ Schema schema, Map<Integer, PartitionSpec> specs, int currentSpecId, FileFormat format,
+ FileWriterFactory<Record> fileWriterFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize,
+ TaskAttemptID taskAttemptID, String tableName, Configuration configuration) {
+ super(schema, specs, io, taskAttemptID, tableName,
+ new ClusteredDataWriter<>(fileWriterFactory, fileFactory, io, format, targetFileSize), false);
+ this.deleteWriter = new HiveIcebergBufferedDeleteWriter(schema, specs, format, fileWriterFactory, fileFactory, io,
+ targetFileSize, configuration);
+ this.insertWriter = new HiveIcebergRecordWriter(schema, specs, currentSpecId, format, fileWriterFactory,
+ fileFactory, io, targetFileSize, taskAttemptID, tableName, true);
+ this.container = new Container<>();
+ Record record = GenericRecord.create(schema);
+ container.set(record);
+ }
+
+ @Override
+ public void write(Writable row) throws IOException {
+ deleteWriter.write(row);
+ IcebergAcidUtil.populateWithNewValues(((Container<Record>) row).get(), container.get());
+ insertWriter.write(container);
+ }
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ deleteWriter.close(abort);
+ insertWriter.close(abort);
+ }
+
+ @Override
+ public FilesForCommit files() {
+ Collection<DataFile> dataFiles = insertWriter.files().dataFiles();
+ Collection<DeleteFile> deleteFiles = deleteWriter.files().deleteFiles();
+ return new FilesForCommit(dataFiles, deleteFiles);
+ }
+}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java
index 57c13f31cb..1ea127e023 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java
@@ -20,90 +20,22 @@
package org.apache.iceberg.mr.hive;
import java.io.IOException;
-import java.util.Map;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.iceberg.PartitionKey;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.InternalRecordWrapper;
-import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.PartitioningWriter;
import org.apache.iceberg.mr.mapred.Container;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.util.Tasks;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-@SuppressWarnings("checkstyle:VisibilityModifier")
-public abstract class HiveIcebergWriter implements FileSinkOperator.RecordWriter,
- org.apache.hadoop.mapred.RecordWriter<NullWritable, Container<Record>> {
- private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergWriter.class);
+public interface HiveIcebergWriter {
+ FilesForCommit files();
+ void close(boolean abort) throws IOException;
+ void write(Writable row) throws IOException;
- private static final Map<TaskAttemptID, Map<String, HiveIcebergWriter>> writers = Maps.newConcurrentMap();
-
- static Map<String, HiveIcebergWriter> removeWriters(TaskAttemptID taskAttemptID) {
- return writers.remove(taskAttemptID);
- }
-
- static Map<String, HiveIcebergWriter> getWriters(TaskAttemptID taskAttemptID) {
- return writers.get(taskAttemptID);
- }
-
- protected final FileIO io;
- protected final InternalRecordWrapper wrapper;
- protected final Map<Integer, PartitionSpec> specs;
- protected final Map<Integer, PartitionKey> partitionKeys;
- protected final PartitioningWriter writer;
-
- protected HiveIcebergWriter(Schema schema, Map<Integer, PartitionSpec> specs, FileIO io, TaskAttemptID attemptID,
- String tableName, PartitioningWriter writer) {
- this.io = io;
- this.wrapper = new InternalRecordWrapper(schema.asStruct());
- this.specs = specs;
- this.partitionKeys = Maps.newHashMapWithExpectedSize(specs.size());
- this.writer = writer;
- writers.putIfAbsent(attemptID, Maps.newConcurrentMap());
- writers.get(attemptID).put(tableName, this);
- }
-
- protected abstract FilesForCommit files();
-
- @Override
- public void write(NullWritable key, Container value) throws IOException {
- write(value);
- }
-
- @Override
- public void close(Reporter reporter) throws IOException {
+ default void close(Reporter reporter) throws IOException {
close(false);
}
- @Override
- public void close(boolean abort) throws IOException {
- writer.close();
- FilesForCommit result = files();
-
- // If abort then remove the unnecessary files
- if (abort) {
- Tasks.foreach(result.allFiles())
- .retry(3)
- .suppressFailureWhenFinished()
- .onFailure((file, exception) -> LOG.debug("Failed on to remove file {} on abort", file, exception))
- .run(file -> io.deleteFile(file.path().toString()));
- }
-
- LOG.info("HiveIcebergWriter is closed with abort={}. Created {} data files and {} delete files", abort,
- result.dataFiles().size(), result.deleteFiles().size());
+ default void write(NullWritable key, Container value) throws IOException {
+ write(value);
}
- protected PartitionKey partition(Record row, int specId) {
- PartitionKey partitionKey = partitionKeys.computeIfAbsent(specId,
- id -> new PartitionKey(specs.get(id), specs.get(id).schema()));
- partitionKey.partition(wrapper.wrap(row));
- return partitionKey;
- }
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriterBase.java
similarity index 88%
copy from iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java
copy to iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriterBase.java
index 57c13f31cb..cd0393f600 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergWriterBase.java
@@ -39,9 +39,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("checkstyle:VisibilityModifier")
-public abstract class HiveIcebergWriter implements FileSinkOperator.RecordWriter,
- org.apache.hadoop.mapred.RecordWriter<NullWritable, Container<Record>> {
- private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergWriter.class);
+public abstract class HiveIcebergWriterBase implements FileSinkOperator.RecordWriter,
+ org.apache.hadoop.mapred.RecordWriter<NullWritable, Container<Record>>, HiveIcebergWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergWriterBase.class);
private static final Map<TaskAttemptID, Map<String, HiveIcebergWriter>> writers = Maps.newConcurrentMap();
@@ -59,19 +59,19 @@ public abstract class HiveIcebergWriter implements FileSinkOperator.RecordWriter
protected final Map<Integer, PartitionKey> partitionKeys;
protected final PartitioningWriter writer;
- protected HiveIcebergWriter(Schema schema, Map<Integer, PartitionSpec> specs, FileIO io, TaskAttemptID attemptID,
- String tableName, PartitioningWriter writer) {
+ protected HiveIcebergWriterBase(Schema schema, Map<Integer, PartitionSpec> specs, FileIO io, TaskAttemptID attemptID,
+ String tableName, PartitioningWriter writer, boolean wrapped) {
this.io = io;
this.wrapper = new InternalRecordWrapper(schema.asStruct());
this.specs = specs;
this.partitionKeys = Maps.newHashMapWithExpectedSize(specs.size());
this.writer = writer;
- writers.putIfAbsent(attemptID, Maps.newConcurrentMap());
- writers.get(attemptID).put(tableName, this);
+ if (!wrapped) {
+ writers.putIfAbsent(attemptID, Maps.newConcurrentMap());
+ writers.get(attemptID).put(tableName, this);
+ }
}
- protected abstract FilesForCommit files();
-
@Override
public void write(NullWritable key, Container value) throws IOException {
write(value);
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
index 2a358f4fe4..faa2ca3ec1 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergAcidUtil.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.mr.hive;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
@@ -38,24 +39,24 @@ public class IcebergAcidUtil {
}
private static final Types.NestedField PARTITION_STRUCT_META_COL = null; // placeholder value in the map
- private static final Map<Types.NestedField, Integer> DELETE_FILE_READ_META_COLS = Maps.newLinkedHashMap();
+ private static final Map<Types.NestedField, Integer> FILE_READ_META_COLS = Maps.newLinkedHashMap();
static {
- DELETE_FILE_READ_META_COLS.put(MetadataColumns.SPEC_ID, 0);
- DELETE_FILE_READ_META_COLS.put(PARTITION_STRUCT_META_COL, 1);
- DELETE_FILE_READ_META_COLS.put(MetadataColumns.FILE_PATH, 2);
- DELETE_FILE_READ_META_COLS.put(MetadataColumns.ROW_POSITION, 3);
+ FILE_READ_META_COLS.put(MetadataColumns.SPEC_ID, 0);
+ FILE_READ_META_COLS.put(PARTITION_STRUCT_META_COL, 1);
+ FILE_READ_META_COLS.put(MetadataColumns.FILE_PATH, 2);
+ FILE_READ_META_COLS.put(MetadataColumns.ROW_POSITION, 3);
}
private static final Types.NestedField PARTITION_HASH_META_COL = Types.NestedField.required(
MetadataColumns.PARTITION_COLUMN_ID, MetadataColumns.PARTITION_COLUMN_NAME, Types.LongType.get());
- private static final Map<Types.NestedField, Integer> DELETE_SERDE_META_COLS = Maps.newLinkedHashMap();
+ private static final Map<Types.NestedField, Integer> SERDE_META_COLS = Maps.newLinkedHashMap();
static {
- DELETE_SERDE_META_COLS.put(MetadataColumns.SPEC_ID, 0);
- DELETE_SERDE_META_COLS.put(PARTITION_HASH_META_COL, 1);
- DELETE_SERDE_META_COLS.put(MetadataColumns.FILE_PATH, 2);
- DELETE_SERDE_META_COLS.put(MetadataColumns.ROW_POSITION, 3);
+ SERDE_META_COLS.put(MetadataColumns.SPEC_ID, 0);
+ SERDE_META_COLS.put(PARTITION_HASH_META_COL, 1);
+ SERDE_META_COLS.put(MetadataColumns.FILE_PATH, 2);
+ SERDE_META_COLS.put(MetadataColumns.ROW_POSITION, 3);
}
/**
@@ -64,8 +65,8 @@ public class IcebergAcidUtil {
* @return The schema for reading files, extended with metadata columns needed for deletes
*/
public static Schema createFileReadSchemaForDelete(List<Types.NestedField> dataCols, Table table) {
- List<Types.NestedField> cols = Lists.newArrayListWithCapacity(dataCols.size() + DELETE_FILE_READ_META_COLS.size());
- DELETE_FILE_READ_META_COLS.forEach((metaCol, index) -> {
+ List<Types.NestedField> cols = Lists.newArrayListWithCapacity(dataCols.size() + FILE_READ_META_COLS.size());
+ FILE_READ_META_COLS.forEach((metaCol, index) -> {
if (metaCol == PARTITION_STRUCT_META_COL) {
cols.add(MetadataColumns.metadataColumn(table, MetadataColumns.PARTITION_COLUMN_NAME));
} else {
@@ -81,23 +82,25 @@ public class IcebergAcidUtil {
* @return The schema for SerDe operations, extended with metadata columns needed for deletes
*/
public static Schema createSerdeSchemaForDelete(List<Types.NestedField> dataCols) {
- List<Types.NestedField> cols = Lists.newArrayListWithCapacity(dataCols.size() + DELETE_SERDE_META_COLS.size());
- DELETE_SERDE_META_COLS.forEach((metaCol, index) -> cols.add(metaCol));
+ List<Types.NestedField> cols = Lists.newArrayListWithCapacity(dataCols.size() + SERDE_META_COLS.size());
+ SERDE_META_COLS.forEach((metaCol, index) -> cols.add(metaCol));
cols.addAll(dataCols);
return new Schema(cols);
}
/**
+ * Based on `rec` the method creates a position delete object, and also populates the data filed of `rowData` with
+ * the field values from `rec`.
* @param rec The record read by the file scan task, which contains both the metadata fields and the row data fields
* @param rowData The record object to populate with the rowData fields only
* @return The position delete object
*/
public static PositionDelete<Record> getPositionDelete(Record rec, Record rowData) {
PositionDelete<Record> positionDelete = PositionDelete.create();
- String filePath = rec.get(DELETE_SERDE_META_COLS.get(MetadataColumns.FILE_PATH), String.class);
- long filePosition = rec.get(DELETE_SERDE_META_COLS.get(MetadataColumns.ROW_POSITION), Long.class);
+ String filePath = rec.get(SERDE_META_COLS.get(MetadataColumns.FILE_PATH), String.class);
+ long filePosition = rec.get(SERDE_META_COLS.get(MetadataColumns.ROW_POSITION), Long.class);
- int dataOffset = DELETE_SERDE_META_COLS.size(); // position in the rec where the actual row data begins
+ int dataOffset = SERDE_META_COLS.size(); // position in the rec where the actual row data begins
for (int i = dataOffset; i < rec.size(); ++i) {
rowData.set(i - dataOffset, rec.get(i));
}
@@ -106,22 +109,85 @@ public class IcebergAcidUtil {
return positionDelete;
}
+ /**
+ * @param dataCols The columns of the original file read schema
+ * @param table The table object - it is used for populating the partition struct meta column
+ * @return The schema for reading files, extended with metadata columns needed for deletes
+ */
+ public static Schema createFileReadSchemaForUpdate(List<Types.NestedField> dataCols, Table table) {
+ List<Types.NestedField> cols = Lists.newArrayListWithCapacity(dataCols.size() + SERDE_META_COLS.size());
+ SERDE_META_COLS.forEach((metaCol, index) -> {
+ if (metaCol == PARTITION_STRUCT_META_COL) {
+ cols.add(MetadataColumns.metadataColumn(table, MetadataColumns.PARTITION_COLUMN_NAME));
+ } else {
+ cols.add(metaCol);
+ }
+ });
+ // New column values
+ cols.addAll(dataCols);
+ // Old column values
+ cols.addAll(dataCols.stream()
+ .map(f -> Types.NestedField.optional(1147483545 + f.fieldId(), "__old_value_for" + f.name(), f.type()))
+ .collect(Collectors.toList()));
+ return new Schema(cols);
+ }
+
+ /**
+ * @param dataCols The columns of the serde projection schema
+ * @return The schema for SerDe operations, extended with metadata columns needed for deletes
+ */
+ public static Schema createSerdeSchemaForUpdate(List<Types.NestedField> dataCols) {
+ List<Types.NestedField> cols = Lists.newArrayListWithCapacity(dataCols.size() + SERDE_META_COLS.size());
+ SERDE_META_COLS.forEach((metaCol, index) -> cols.add(metaCol));
+ // New column values
+ cols.addAll(dataCols);
+ // Old column values
+ cols.addAll(dataCols.stream()
+ .map(f -> Types.NestedField.optional(1147483545 + f.fieldId(), "__old_value_for_" + f.name(), f.type()))
+ .collect(Collectors.toList()));
+ return new Schema(cols);
+ }
+
+ /**
+ * Get the original record from the updated record. Populate the `original` with the filed values from `rec`.
+ * @param rec The record read by the file scan task, which contains both the metadata fields and the row data fields
+ * @param original The record object to populate. The end result is the original record before the update.
+ */
+ public static void populateWithOriginalValues(Record rec, Record original) {
+ int dataOffset = SERDE_META_COLS.size() + original.size();
+ for (int i = dataOffset; i < dataOffset + original.size(); ++i) {
+ original.set(i - dataOffset, rec.get(i));
+ }
+ }
+
+ /**
+ * Get the new record from the updated record. Populate the `newRecord` with the filed values from `rec`.
+ * @param rec The record read by the file scan task, which contains both the metadata fields and the row data fields
+ * @param newRecord The record object to populate. The end result is the new record after the update.
+ */
+ public static void populateWithNewValues(Record rec, Record newRecord) {
+ int dataOffset = SERDE_META_COLS.size();
+ for (int i = dataOffset; i < dataOffset + newRecord.size(); ++i) {
+ newRecord.set(i - dataOffset, rec.get(i));
+ }
+ }
+
public static int parseSpecId(Record rec) {
- return rec.get(DELETE_FILE_READ_META_COLS.get(MetadataColumns.SPEC_ID), Integer.class);
+ return rec.get(FILE_READ_META_COLS.get(MetadataColumns.SPEC_ID), Integer.class);
}
public static long computePartitionHash(Record rec) {
- StructProjection part = rec.get(DELETE_FILE_READ_META_COLS.get(PARTITION_STRUCT_META_COL), StructProjection.class);
+ StructProjection part = rec.get(FILE_READ_META_COLS.get(PARTITION_STRUCT_META_COL), StructProjection.class);
// we need to compute a hash value for the partition struct so that it can be used as a sorting key
return computeHash(part);
}
public static String parseFilePath(Record rec) {
- return rec.get(DELETE_FILE_READ_META_COLS.get(MetadataColumns.FILE_PATH), String.class);
+ return rec.get(FILE_READ_META_COLS.get(MetadataColumns.FILE_PATH), String.class);
}
public static long parseFilePosition(Record rec) {
- return rec.get(DELETE_FILE_READ_META_COLS.get(MetadataColumns.ROW_POSITION), Long.class);
+ return rec.get(FILE_READ_META_COLS.get(MetadataColumns.ROW_POSITION), Long.class);
}
private static long computeHash(StructProjection struct) {
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/IcebergGenerics2.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/IcebergGenerics2.java
new file mode 100644
index 0000000000..fc87302591
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/data/IcebergGenerics2.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+
+public class IcebergGenerics2 {
+ private IcebergGenerics2() {
+ }
+
+ /**
+ * Returns a builder to configure a read of the given table that produces generic records.
+ *
+ * @param table an Iceberg table
+ * @return a builder to configure the scan
+ */
+ public static ScanBuilder read(Table table) {
+ return new ScanBuilder(table);
+ }
+
+ public static class ScanBuilder {
+ private TableScan tableScan;
+ private boolean reuseContainers = false;
+
+ public ScanBuilder(Table table) {
+ this.tableScan = table.newScan();
+ }
+
+ public ScanBuilder reuseContainers() {
+ this.reuseContainers = true;
+ return this;
+ }
+
+ public ScanBuilder where(Expression rowFilter) {
+ this.tableScan = tableScan.filter(rowFilter);
+ return this;
+ }
+
+ public ScanBuilder caseInsensitive() {
+ this.tableScan = tableScan.caseSensitive(false);
+ return this;
+ }
+
+ public ScanBuilder select(String... selectedColumns) {
+ this.tableScan = tableScan.select(ImmutableList.copyOf(selectedColumns));
+ return this;
+ }
+
+ public ScanBuilder project(Schema schema) {
+ this.tableScan = tableScan.project(schema);
+ return this;
+ }
+
+ public ScanBuilder useSnapshot(long scanSnapshotId) {
+ this.tableScan = tableScan.useSnapshot(scanSnapshotId);
+ return this;
+ }
+
+ public ScanBuilder asOfTime(long scanTimestampMillis) {
+ this.tableScan = tableScan.asOfTime(scanTimestampMillis);
+ return this;
+ }
+
+ public ScanBuilder appendsBetween(long fromSnapshotId, long toSnapshotId) {
+ this.tableScan = tableScan.appendsBetween(fromSnapshotId, toSnapshotId);
+ return this;
+ }
+
+ public ScanBuilder appendsAfter(long fromSnapshotId) {
+ this.tableScan = tableScan.appendsAfter(fromSnapshotId);
+ return this;
+ }
+
+ public CloseableIterable<Record> build() {
+ return new TableScanIterable(
+ tableScan,
+ reuseContainers
+ );
+ }
+ }
+}
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java
index c7170420bb..25c04aebfd 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
@@ -40,6 +41,7 @@ import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.rules.TemporaryFolder;
@@ -120,6 +122,31 @@ public class TestHelper {
appender().appendToTable(partition, records);
}
+ /**
+ * Appends the rows to the table. If the table is partitioned then it will create the correct partitions.
+ * @param rowSet The rows to add
+ * @throws IOException If there is an exception during writing out the files
+ */
+ public void appendToTable(List<Record> rowSet) throws IOException {
+ // The rows collected by partitions
+ Map<PartitionKey, List<Record>> rows = Maps.newHashMap();
+ PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
+ for (Record record : rowSet) {
+ partitionKey.partition(record);
+ List<Record> partitionRows = rows.get(partitionKey);
+ if (partitionRows == null) {
+ partitionRows = Lists.newArrayList();
+ rows.put(partitionKey.copy(), partitionRows);
+ }
+
+ partitionRows.add(record);
+ }
+
+ for (PartitionKey partition : rows.keySet()) {
+ appendToTable(partition, rows.get(partition));
+ }
+ }
+
public DataFile writeFile(StructLike partition, List<Record> records) throws IOException {
return appender().writeFile(partition, records);
}
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergWriterTestBase.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergWriterTestBase.java
new file mode 100644
index 0000000000..1199391304
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergWriterTestBase.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenerics2;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.mr.TestHelper;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runners.Parameterized;
+
+public class HiveIcebergWriterTestBase {
+ // Schema passed to create tables
+ public static final Schema SCHEMA = new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "data", Types.StringType.get())
+ );
+
+ public static final List<Record> RECORDS = TestHelper.RecordsBuilder.newInstance(SCHEMA)
+ .add(29, "a")
+ .add(43, "b")
+ .add(61, "c")
+ .add(89, "d")
+ .add(100, "e")
+ .add(121, "f")
+ .add(122, "g")
+ .build();
+
+ private final HadoopTables tables = new HadoopTables(new HiveConf());
+ private TestHelper helper;
+ protected Table table;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Parameterized.Parameter(0)
+ public FileFormat fileFormat;
+
+ @Parameterized.Parameter(1)
+ public boolean partitioned;
+
+ @Parameterized.Parameters(name = "fileFormat={0}, partitioned={1}")
+ public static Collection<Object[]> parameters() {
+ return Lists.newArrayList(new Object[][] {
+ { FileFormat.PARQUET, true },
+ { FileFormat.ORC, true },
+ { FileFormat.AVRO, true },
+ { FileFormat.PARQUET, false },
+// Skip this until the ORC reader is fixed - test only issue
+// { FileFormat.ORC, false },
+ { FileFormat.AVRO, false }
+ });
+ }
+
+ @Before
+ public void init() throws IOException {
+ File location = temp.newFolder(fileFormat.name());
+ Assert.assertTrue(location.delete());
+
+ PartitionSpec spec = !partitioned ? PartitionSpec.unpartitioned() :
+ PartitionSpec.builderFor(SCHEMA)
+ .bucket("data", 3)
+ .build();
+ this.helper = new TestHelper(new HiveConf(), tables, location.toString(), SCHEMA, spec, fileFormat, temp);
+ this.table = helper.createTable();
+ helper.appendToTable(RECORDS);
+
+ TableOperations ops = ((BaseTable) table).operations();
+ TableMetadata meta = ops.current();
+ ops.commit(meta, meta.upgradeToFormatVersion(2));
+ }
+
+ @After
+ public void cleanUp() {
+ tables.dropTable(helper.table().location());
+ }
+
+ protected StructLikeSet rowSetWithoutIds(List<Record> records, Set<Integer> idToDelete) {
+ StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+ records.stream()
+ .filter(row -> !idToDelete.contains(row.getField("id")))
+ .forEach(set::add);
+ return set;
+ }
+
+ protected static List<GenericRecord> readRecords(Table table, Schema schema) throws IOException {
+ List<GenericRecord> records = Lists.newArrayList();
+ try (CloseableIterable<Record> reader = IcebergGenerics2.read(table).project(schema).build()) {
+ // For these tests we can be sure that the records are GenericRecords, and we need it to easily access fields
+ reader.forEach(record -> records.add((GenericRecord) record));
+ }
+
+ return records;
+ }
+
+ protected static StructLikeSet actualRowSet(Table table) throws IOException {
+ StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+ readRecords(table, table.schema()).forEach(set::add);
+ return set;
+ }
+
+ protected static Schema schemaWithMeta(Table table) {
+ List<Types.NestedField> cols = Lists.newArrayListWithCapacity(table.schema().columns().size() + 4);
+ cols.addAll(table.schema().columns());
+ cols.add(MetadataColumns.ROW_POSITION);
+ cols.add(MetadataColumns.FILE_PATH);
+ cols.add(MetadataColumns.SPEC_ID);
+ cols.add(MetadataColumns.metadataColumn(table, MetadataColumns.PARTITION_COLUMN_NAME));
+ return new Schema(cols);
+ }
+}
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergDeleteWriter.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergDeleteWriter.java
new file mode 100644
index 0000000000..6b6e9c4b1e
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergDeleteWriter.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestHiveIcebergDeleteWriter extends HiveIcebergWriterTestBase {
+ private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024;
+ private static final JobID JOB_ID = new JobID("test", 0);
+ private static final Set<Integer> DELETED_IDS = Sets.newHashSet(29, 61, 89, 100, 122);
+
+ @Test
+ public void testDelete() throws IOException {
+ HiveIcebergDeleteWriter testWriter = deleteWriter();
+
+ List<GenericRecord> deleteRecords = deleteRecords(table, DELETED_IDS);
+
+ Collections.sort(deleteRecords,
+ Comparator.comparing(a -> a.getField(MetadataColumns.PARTITION_COLUMN_NAME).toString()));
+
+ Container<Record> container = new Container<>();
+ for (Record deleteRecord : deleteRecords) {
+ container.set(deleteRecord);
+ testWriter.write(container);
+ }
+
+ testWriter.close(false);
+
+ RowDelta rowDelta = table.newRowDelta();
+ testWriter.files().deleteFiles().forEach(rowDelta::addDeletes);
+ rowDelta.commit();
+
+ StructLikeSet expected = rowSetWithoutIds(RECORDS, DELETED_IDS);
+ StructLikeSet actual = actualRowSet(table);
+
+ Assert.assertEquals("Table should contain expected rows", expected, actual);
+ }
+
+ private static List<GenericRecord> deleteRecords(Table table, Set<Integer> idsToRemove)
+ throws IOException {
+ List<GenericRecord> deleteRecords = Lists.newArrayListWithExpectedSize(idsToRemove.size());
+ for (GenericRecord record : readRecords(table, schemaWithMeta(table))) {
+ if (!idsToRemove.contains(record.getField("id"))) {
+ continue;
+ }
+
+ GenericRecord deleteRecord = GenericRecord.create(IcebergAcidUtil.createSerdeSchemaForDelete(SCHEMA.columns()));
+ int specId = (Integer) record.getField(MetadataColumns.SPEC_ID.name());
+ deleteRecord.setField(MetadataColumns.SPEC_ID.name(), specId);
+ PartitionKey partitionKey = new PartitionKey(table.specs().get(specId), table.schema());
+ partitionKey.partition(record);
+ deleteRecord.setField(MetadataColumns.PARTITION_COLUMN_NAME, partitionKey);
+ deleteRecord.setField(MetadataColumns.FILE_PATH.name(), record.getField(MetadataColumns.FILE_PATH.name()));
+ deleteRecord.setField(MetadataColumns.ROW_POSITION.name(), record.getField(MetadataColumns.ROW_POSITION.name()));
+
+ SCHEMA.columns().forEach(field -> deleteRecord.setField(field.name(), record.getField(field.name())));
+ deleteRecords.add(deleteRecord);
+ }
+ return deleteRecords;
+ }
+
+ private HiveIcebergDeleteWriter deleteWriter() {
+ OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(table, 1, 2)
+ .format(fileFormat)
+ .operationId("3")
+ .build();
+
+ HiveFileWriterFactory hfwf = new HiveFileWriterFactory(table, fileFormat, SCHEMA, null, fileFormat, null, null,
+ null, null);
+
+ TaskAttemptID taskId = new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, 1, 0);
+
+ return new HiveIcebergDeleteWriter(table.schema(), table.specs(), fileFormat, hfwf, outputFileFactory, table.io(),
+ TARGET_FILE_SIZE, taskId, "partitioned");
+ }
+}
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
index 4896f0899e..5857d858f5 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
@@ -59,7 +59,7 @@ import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
-import static org.apache.iceberg.mr.hive.HiveIcebergWriter.getWriters;
+import static org.apache.iceberg.mr.hive.HiveIcebergWriterBase.getWriters;
import static org.apache.iceberg.types.Types.NestedField.required;
public class TestHiveIcebergOutputCommitter {
@@ -288,7 +288,7 @@ public class TestHiveIcebergOutputCommitter {
HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema, table.specs(),
table.spec().specId(), fileFormat, hfwf, outputFileFactory, io, TARGET_FILE_SIZE,
- TezUtil.taskAttemptWrapper(taskId), conf.get(Catalogs.NAME));
+ TezUtil.taskAttemptWrapper(taskId), conf.get(Catalogs.NAME), false);
Container<Record> container = new Container<>();
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergUpdateWriter.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergUpdateWriter.java
new file mode 100644
index 0000000000..37df7803ab
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergUpdateWriter.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestHiveIcebergUpdateWriter extends HiveIcebergWriterTestBase {
+ private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024;
+ private static final JobID JOB_ID = new JobID("test", 0);
+ private static final TaskAttemptID TASK_ATTEMPT_ID =
+ new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, 0, 0);
+ private static final Map<Integer, GenericRecord> UPDATED_RECORDS = ImmutableMap.of(
+ 29, record(29, "d"),
+ 61, record(61, "h"),
+ 89, record(81, "a"),
+ 100, record(132, "e"),
+ 122, record(142, "k"));
+
+ /**
+ * This test just runs sends the data through the DeleteWriter. Here we make sure that the correct rows are removed.
+ * @throws IOException If here is an error
+ */
+ @Test
+ public void testDelete() throws IOException {
+ HiveIcebergWriter testWriter = new HiveIcebergBufferedDeleteWriter(table.schema(), table.specs(), fileFormat,
+ hiveFileWriterFactory(), outputFileFactory(), table.io(), TARGET_FILE_SIZE, new HiveConf());
+
+ update(table, testWriter);
+
+ StructLikeSet expected = rowSetWithoutIds(RECORDS, UPDATED_RECORDS.keySet());
+ StructLikeSet actual = actualRowSet(table);
+
+ Assert.assertEquals("Table should contain expected rows", expected, actual);
+ }
+
+ /**
+ * This test uses the UpdateWriter to check that the values are correctly updated.
+ * @throws IOException If there is an error
+ */
+ @Test
+ public void testUpdate() throws IOException {
+ HiveIcebergWriter testWriter = new HiveIcebergUpdateWriter(table.schema(), table.specs(), table.spec().specId(),
+ fileFormat, hiveFileWriterFactory(), outputFileFactory(), table.io(), TARGET_FILE_SIZE, TASK_ATTEMPT_ID,
+ "table_name", new HiveConf());
+
+ update(table, testWriter);
+
+ StructLikeSet expected = rowSetWithoutIds(RECORDS, UPDATED_RECORDS.keySet());
+ expected.addAll(UPDATED_RECORDS.values());
+ StructLikeSet actual = actualRowSet(table);
+
+ Assert.assertEquals("Table should contain expected rows", expected, actual);
+ }
+
+ private OutputFileFactory outputFileFactory() {
+ return OutputFileFactory.builderFor(table, 1, 2)
+ .format(fileFormat)
+ .operationId("3")
+ .build();
+ }
+
+ private HiveFileWriterFactory hiveFileWriterFactory() {
+ return new HiveFileWriterFactory(table, fileFormat, SCHEMA, null, fileFormat, null, null,
+ null, null);
+ }
+
+ private static void update(Table table, HiveIcebergWriter testWriter) throws IOException {
+ List<GenericRecord> updateRecords = updateRecords(table, UPDATED_RECORDS);
+
+ Collections.sort(updateRecords, Comparator.comparing(a -> a.getField("data").toString()));
+
+ Container<Record> container = new Container<>();
+ for (Record deleteRecord : updateRecords) {
+ container.set(deleteRecord);
+ testWriter.write(container);
+ }
+
+ testWriter.close(false);
+
+ RowDelta rowDelta = table.newRowDelta();
+ testWriter.files().deleteFiles().forEach(rowDelta::addDeletes);
+ testWriter.files().dataFiles().forEach(rowDelta::addRows);
+ rowDelta.commit();
+ }
+
+ private static List<GenericRecord> updateRecords(Table table, Map<Integer, GenericRecord> updated)
+ throws IOException {
+ List<GenericRecord> updateRecords = Lists.newArrayListWithExpectedSize(updated.size());
+ for (GenericRecord record : readRecords(table, schemaWithMeta(table))) {
+ if (!updated.keySet().contains(record.getField("id"))) {
+ continue;
+ }
+
+ GenericRecord updateRecord = GenericRecord.create(IcebergAcidUtil.createSerdeSchemaForUpdate(SCHEMA.columns()));
+ int specId = (Integer) record.getField(MetadataColumns.SPEC_ID.name());
+ updateRecord.setField(MetadataColumns.SPEC_ID.name(), specId);
+ PartitionKey partitionKey = new PartitionKey(table.specs().get(specId), table.schema());
+ partitionKey.partition(record);
+ updateRecord.setField(MetadataColumns.PARTITION_COLUMN_NAME, partitionKey);
+ updateRecord.setField(MetadataColumns.FILE_PATH.name(), record.getField(MetadataColumns.FILE_PATH.name()));
+ updateRecord.setField(MetadataColumns.ROW_POSITION.name(), record.getField(MetadataColumns.ROW_POSITION.name()));
+
+ SCHEMA.columns().forEach(field -> {
+ updateRecord.setField(field.name(), updated.get(record.getField("id")).getField(field.name()));
+ updateRecord.setField("__old_value_for_" + field.name(), record.getField(field.name()));
+ });
+ updateRecords.add(updateRecord);
+ }
+ return updateRecords;
+ }
+
+ private static GenericRecord record(Integer id, String data) {
+ GenericRecord record = GenericRecord.create(SCHEMA);
+ record.setField("id", id);
+ record.setField("data", data);
+ return record;
+ }
+}
diff --git a/iceberg/pom.xml b/iceberg/pom.xml
index adfea1b5eb..b742f19398 100644
--- a/iceberg/pom.xml
+++ b/iceberg/pom.xml
@@ -182,6 +182,12 @@
<classifier>tests</classifier>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-core</artifactId>
+ <classifier>tests</classifier>
+ <version>${iceberg.version}</version>
+ </dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>