You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2021/10/18 10:44:49 UTC
[iceberg] branch master updated: ORC: Add DeleteWriteBuilder for
format v2 (#3250)
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 1b920e2 ORC: Add DeleteWriteBuilder for format v2 (#3250)
1b920e2 is described below
commit 1b920e2945d6fc48661735e884e4f1919e9cdf86
Author: pvary <pv...@cloudera.com>
AuthorDate: Mon Oct 18 12:44:35 2021 +0200
ORC: Add DeleteWriteBuilder for format v2 (#3250)
---
.../main/java/org/apache/iceberg/avro/Avro.java | 4 +-
.../apache/iceberg/data/BaseFileWriterFactory.java | 33 +++-
.../java/org/apache/iceberg/data/DeleteFilter.java | 12 ++
.../iceberg/data/orc/TestOrcRowIterator.java | 8 +-
.../apache/iceberg/io/TestFileWriterFactory.java | 18 ++-
.../org/apache/iceberg/io/TestWriterMetrics.java | 3 -
.../apache/iceberg/flink/data/FlinkOrcWriter.java | 5 +
.../iceberg/flink/sink/FlinkFileWriterFactory.java | 11 ++
.../apache/iceberg/data/orc/GenericOrcReader.java | 0
.../apache/iceberg/data/orc/GenericOrcReaders.java | 0
.../apache/iceberg/data/orc/GenericOrcWriter.java | 5 +
.../apache/iceberg/data/orc/GenericOrcWriters.java | 37 +++++
orc/src/main/java/org/apache/iceberg/orc/ORC.java | 177 ++++++++++++++++++++-
.../java/org/apache/iceberg/orc/OrcRowWriter.java | 3 +
.../apache/iceberg/orc/TestOrcDeleteWriters.java | 61 +++----
.../java/org/apache/iceberg/parquet/Parquet.java | 6 +-
.../iceberg/parquet/TestParquetDeleteWriters.java | 3 +
.../apache/iceberg/spark/data/SparkOrcWriter.java | 5 +
.../spark/source/SparkFileWriterFactory.java | 11 ++
.../data/TestSparkOrcReadMetadataColumns.java | 6 +-
20 files changed, 357 insertions(+), 51 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java
index a1d210c..05c18a2 100644
--- a/core/src/main/java/org/apache/iceberg/avro/Avro.java
+++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java
@@ -459,8 +459,7 @@ public class Avro {
public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids");
- Preconditions.checkArgument(spec != null,
- "Spec must not be null when creating position delete writer");
+ Preconditions.checkArgument(spec != null, "Spec must not be null when creating position delete writer");
Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
"Partition must not be null for partitioned writes");
@@ -476,6 +475,7 @@ public class Avro {
} else {
appenderBuilder.schema(DeleteSchemaUtil.pathPosSchema());
+ // We ignore the 'createWriterFunc' and 'rowSchema' even if is provided, since we do not write row data itself
appenderBuilder.createWriterFunc(ignored -> new PositionDatumWriter());
}
diff --git a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
index 62e09e6..3791d34 100644
--- a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
+++ b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
@@ -77,8 +77,9 @@ public abstract class BaseFileWriterFactory<T> implements FileWriterFactory<T> {
protected abstract void configureEqualityDelete(Parquet.DeleteWriteBuilder builder);
protected abstract void configurePositionDelete(Parquet.DeleteWriteBuilder builder);
- // TODO: provide ways to configure ORC delete writers once we support them
protected abstract void configureDataWrite(ORC.DataWriteBuilder builder);
+ protected abstract void configureEqualityDelete(ORC.DeleteWriteBuilder builder);
+ protected abstract void configurePositionDelete(ORC.DeleteWriteBuilder builder);
@Override
public DataWriter<T> newDataWriter(EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
@@ -184,6 +185,22 @@ public abstract class BaseFileWriterFactory<T> implements FileWriterFactory<T> {
return parquetBuilder.buildEqualityWriter();
+ case ORC:
+ ORC.DeleteWriteBuilder orcBuilder = ORC.writeDeletes(outputFile)
+ .setAll(properties)
+ .metricsConfig(metricsConfig)
+ .rowSchema(equalityDeleteRowSchema)
+ .equalityFieldIds(equalityFieldIds)
+ .withSpec(spec)
+ .withPartition(partition)
+ .withKeyMetadata(keyMetadata)
+ .withSortOrder(equalityDeleteSortOrder)
+ .overwrite();
+
+ configureEqualityDelete(orcBuilder);
+
+ return orcBuilder.buildEqualityWriter();
+
default:
throw new UnsupportedOperationException("Unsupported format for equality deletes: " + deleteFileFormat);
}
@@ -230,6 +247,20 @@ public abstract class BaseFileWriterFactory<T> implements FileWriterFactory<T> {
return parquetBuilder.buildPositionWriter();
+ case ORC:
+ ORC.DeleteWriteBuilder orcBuilder = ORC.writeDeletes(outputFile)
+ .setAll(properties)
+ .metricsConfig(metricsConfig)
+ .rowSchema(positionDeleteRowSchema)
+ .withSpec(spec)
+ .withPartition(partition)
+ .withKeyMetadata(keyMetadata)
+ .overwrite();
+
+ configurePositionDelete(orcBuilder);
+
+ return orcBuilder.buildPositionWriter();
+
default:
throw new UnsupportedOperationException("Unsupported format for position deletes: " + deleteFileFormat);
}
diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
index 148f9d9..40a77a1 100644
--- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
@@ -34,11 +34,13 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.deletes.Deletes;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -227,6 +229,16 @@ public abstract class DeleteFilter<T> {
return builder.build();
case ORC:
+ // Reusing containers is automatic for ORC. No need to set 'reuseContainers' here.
+ ORC.ReadBuilder orcBuilder = ORC.read(input)
+ .project(deleteSchema)
+ .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema));
+
+ if (deleteFile.content() == FileContent.POSITION_DELETES) {
+ orcBuilder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), dataFile.path()));
+ }
+
+ return orcBuilder.build();
default:
throw new UnsupportedOperationException(String.format(
"Cannot read deletes, %s is not a supported format: %s", deleteFile.format().name(), deleteFile.path()));
diff --git a/data/src/test/java/org/apache/iceberg/data/orc/TestOrcRowIterator.java b/data/src/test/java/org/apache/iceberg/data/orc/TestOrcRowIterator.java
index cde3f1e..ab646b2 100644
--- a/data/src/test/java/org/apache/iceberg/data/orc/TestOrcRowIterator.java
+++ b/data/src/test/java/org/apache/iceberg/data/orc/TestOrcRowIterator.java
@@ -76,10 +76,10 @@ public class TestOrcRowIterator {
.createWriterFunc(GenericOrcWriter::buildWriter)
.schema(DATA_SCHEMA)
// write in such a way that the file contains 2 stripes each with 4 row groups of 1000 rows
- .config("iceberg.orc.vectorbatch.size", "1000")
- .config(OrcConf.ROW_INDEX_STRIDE.getAttribute(), "1000")
- .config(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), "4000")
- .config(OrcConf.STRIPE_SIZE.getAttribute(), "1")
+ .set("iceberg.orc.vectorbatch.size", "1000")
+ .set(OrcConf.ROW_INDEX_STRIDE.getAttribute(), "1000")
+ .set(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), "4000")
+ .set(OrcConf.STRIPE_SIZE.getAttribute(), "1")
.build()) {
writer.addAll(DATA_ROWS);
}
diff --git a/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java
index c3c4691..786b542 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java
@@ -33,11 +33,13 @@ import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -131,8 +133,6 @@ public abstract class TestFileWriterFactory<T> extends WriterTestBase<T> {
@Test
public void testEqualityDeleteWriter() throws IOException {
- Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
-
List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
Schema equalityDeleteRowSchema = table.schema().select("id");
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
@@ -179,7 +179,6 @@ public abstract class TestFileWriterFactory<T> extends WriterTestBase<T> {
@Test
public void testEqualityDeleteWriterWithMultipleSpecs() throws IOException {
- Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
Assume.assumeFalse("Table must start unpartitioned", partitioned);
List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
@@ -242,8 +241,6 @@ public abstract class TestFileWriterFactory<T> extends WriterTestBase<T> {
@Test
public void testPositionDeleteWriter() throws IOException {
- Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
-
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
// write a data file
@@ -288,8 +285,6 @@ public abstract class TestFileWriterFactory<T> extends WriterTestBase<T> {
@Test
public void testPositionDeleteWriterWithRow() throws IOException {
- Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
-
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), table.schema());
// write a data file
@@ -400,6 +395,15 @@ public abstract class TestFileWriterFactory<T> extends WriterTestBase<T> {
return ImmutableList.copyOf(records);
}
+ case ORC:
+ try (CloseableIterable<Record> records = ORC.read(inputFile)
+ .project(schema)
+ .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema))
+ .build()) {
+
+ return ImmutableList.copyOf(records);
+ }
+
default:
throw new UnsupportedOperationException("Unsupported read file format: " + fileFormat);
}
diff --git a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
index 5909731..f52e422 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
@@ -41,7 +41,6 @@ import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.junit.After;
import org.junit.Assert;
-import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -157,8 +156,6 @@ public abstract class TestWriterMetrics<T> {
@Test
public void testPositionDeleteMetrics() throws IOException {
- Assume.assumeTrue(fileFormat == FileFormat.PARQUET);
-
FileWriterFactory<T> writerFactory = newWriterFactory(SCHEMA);
EncryptedOutputFile outputFile = fileFactory.newOutputFile();
PositionDeleteWriter<T> deleteWriter = writerFactory.newPositionDeleteWriter(outputFile, table.spec(), null);
diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
index 3f469b7..2eeb268 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
@@ -54,6 +54,11 @@ public class FlinkOrcWriter implements OrcRowWriter<RowData> {
}
@Override
+ public List<OrcValueWriter<?>> writers() {
+ return writer.writers();
+ }
+
+ @Override
public Stream<FieldMetrics<?>> metrics() {
return writer.metrics();
}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
index 6ce2542..55a9539 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
@@ -109,6 +109,17 @@ class FlinkFileWriterFactory extends BaseFileWriterFactory<RowData> implements S
builder.createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(dataFlinkType(), iSchema));
}
+ @Override
+ protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
+ builder.createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(equalityDeleteFlinkType(), iSchema));
+ }
+
+ @Override
+ protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
+ builder.createWriterFunc((iSchema, typDesc) -> FlinkOrcWriter.buildWriter(positionDeleteFlinkType(), iSchema));
+ builder.transformPaths(path -> StringData.fromString(path.toString()));
+ }
+
private RowType dataFlinkType() {
if (dataFlinkType == null) {
Preconditions.checkNotNull(dataSchema(), "Data schema must not be null");
diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java b/orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
similarity index 100%
rename from data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
rename to orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java b/orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java
similarity index 100%
rename from data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java
rename to orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java
diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
similarity index 98%
rename from data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
rename to orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
index 4e0cb77..a136085 100644
--- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
+++ b/orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
@@ -119,6 +119,11 @@ public class GenericOrcWriter implements OrcRowWriter<Record> {
}
@Override
+ public List<OrcValueWriter<?>> writers() {
+ return writer.writers();
+ }
+
+ @Override
public Stream<FieldMetrics<?>> metrics() {
return writer.metrics();
}
diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
similarity index 92%
rename from data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
rename to orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
index e0d2c5a..5038d13 100644
--- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
+++ b/orc/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.data.orc;
+import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@@ -37,6 +38,8 @@ import java.util.stream.Stream;
import org.apache.iceberg.DoubleFieldMetrics;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.FloatFieldMetrics;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.orc.OrcRowWriter;
import org.apache.iceberg.orc.OrcValueWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -138,6 +141,11 @@ public class GenericOrcWriters {
return new MapWriter<>(key, value);
}
+ public static <T> OrcRowWriter<PositionDelete<T>> positionDelete(OrcRowWriter<T> writer,
+ Function<CharSequence, ?> pathTransformFunc) {
+ return new PositionDeleteStructWriter<>(writer, pathTransformFunc);
+ }
+
private static class BooleanWriter implements OrcValueWriter<Boolean> {
private static final OrcValueWriter<Boolean> INSTANCE = new BooleanWriter();
@@ -479,6 +487,35 @@ public class GenericOrcWriters {
protected abstract Object get(S struct, int index);
}
+ private static class PositionDeleteStructWriter<T> extends StructWriter<PositionDelete<T>>
+ implements OrcRowWriter<PositionDelete<T>> {
+ private final Function<CharSequence, ?> pathTransformFunc;
+
+ PositionDeleteStructWriter(OrcRowWriter<T> replacedWriter, Function<CharSequence, ?> pathTransformFunc) {
+ super(replacedWriter.writers());
+ this.pathTransformFunc = pathTransformFunc;
+ }
+
+ @Override
+ protected Object get(PositionDelete<T> delete, int index) {
+ switch (index) {
+ case 0:
+ return pathTransformFunc.apply(delete.path());
+ case 1:
+ return delete.pos();
+ case 2:
+ return delete.row();
+ }
+ throw new IllegalArgumentException("Cannot get value for invalid index: " + index);
+ }
+
+ @Override
+ public void write(PositionDelete<T> row, VectorizedRowBatch output) throws IOException {
+ Preconditions.checkArgument(row != null, "value must not be null");
+ writeRow(row, output);
+ }
+ }
+
private static void growColumnVector(ColumnVector cv, int requestedSize) {
if (cv.isNull.length < requestedSize) {
// Use growth factor of 3 to avoid frequent array allocations
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
index b326c2f..b4936b1 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
@@ -22,9 +22,13 @@ package org.apache.iceberg.orc;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.FileFormat;
@@ -34,6 +38,10 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.orc.GenericOrcWriters;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
@@ -41,11 +49,13 @@ import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.HadoopOutputFile;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.DeleteSchemaUtil;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.ArrayUtil;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
import org.apache.orc.OrcFile.ReaderOptions;
@@ -87,7 +97,19 @@ public class ORC {
return this;
}
+ /**
+ * Setting a specific configuration value for the writer.
+ * @param property The property to set
+ * @param value The value to set
+ * @return The resulting builder for chaining purposes
+ * @deprecated Please use #set(String, String) instead
+ */
+ @Deprecated
public WriteBuilder config(String property, String value) {
+ return set(property, value);
+ }
+
+ public WriteBuilder set(String property, String value) {
conf.set(property, value);
return this;
}
@@ -160,7 +182,7 @@ public class ORC {
}
public DataWriteBuilder set(String property, String value) {
- appenderBuilder.config(property, value);
+ appenderBuilder.set(property, value);
return this;
}
@@ -223,6 +245,159 @@ public class ORC {
}
}
+ public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+ return new DeleteWriteBuilder(file);
+ }
+
+ public static class DeleteWriteBuilder {
+ private final WriteBuilder appenderBuilder;
+ private final String location;
+ private BiFunction<Schema, TypeDescription, OrcRowWriter<?>> createWriterFunc = null;
+ private Schema rowSchema = null;
+ private PartitionSpec spec = null;
+ private StructLike partition = null;
+ private EncryptionKeyMetadata keyMetadata = null;
+ private int[] equalityFieldIds = null;
+ private SortOrder sortOrder;
+ private Function<CharSequence, ?> pathTransformFunc = Function.identity();
+
+ private DeleteWriteBuilder(OutputFile file) {
+ this.appenderBuilder = write(file);
+ this.location = file.location();
+ }
+
+ public DeleteWriteBuilder forTable(Table table) {
+ rowSchema(table.schema());
+ withSpec(table.spec());
+ setAll(table.properties());
+ metricsConfig(MetricsConfig.forTable(table));
+ return this;
+ }
+
+ public DeleteWriteBuilder set(String property, String value) {
+ appenderBuilder.set(property, value);
+ return this;
+ }
+
+ public DeleteWriteBuilder setAll(Map<String, String> properties) {
+ appenderBuilder.setAll(properties);
+ return this;
+ }
+
+ public DeleteWriteBuilder meta(String property, String value) {
+ appenderBuilder.metadata(property, value);
+ return this;
+ }
+
+ public DeleteWriteBuilder overwrite() {
+ return overwrite(true);
+ }
+
+ public DeleteWriteBuilder overwrite(boolean enabled) {
+ appenderBuilder.overwrite(enabled);
+ return this;
+ }
+
+ public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
+ appenderBuilder.metricsConfig(newMetricsConfig);
+ return this;
+ }
+
+ public DeleteWriteBuilder createWriterFunc(BiFunction<Schema, TypeDescription, OrcRowWriter<?>> newWriterFunc) {
+ this.createWriterFunc = newWriterFunc;
+ return this;
+ }
+
+ public DeleteWriteBuilder rowSchema(Schema newSchema) {
+ this.rowSchema = newSchema;
+ return this;
+ }
+
+ public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
+ this.spec = newSpec;
+ return this;
+ }
+
+ public DeleteWriteBuilder withPartition(StructLike key) {
+ this.partition = key;
+ return this;
+ }
+
+ public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+ this.keyMetadata = metadata;
+ return this;
+ }
+
+ public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
+ this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds);
+ return this;
+ }
+
+ public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
+ this.equalityFieldIds = fieldIds;
+ return this;
+ }
+
+ public DeleteWriteBuilder transformPaths(Function<CharSequence, ?> newPathTransformFunc) {
+ this.pathTransformFunc = newPathTransformFunc;
+ return this;
+ }
+
+ public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) {
+ this.sortOrder = newSortOrder;
+ return this;
+ }
+
+ public <T> EqualityDeleteWriter<T> buildEqualityWriter() {
+ Preconditions.checkState(rowSchema != null, "Cannot create equality delete file without a schema");
+ Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids");
+ Preconditions.checkState(createWriterFunc != null,
+ "Cannot create equality delete file unless createWriterFunc is set");
+ Preconditions.checkArgument(spec != null, "Spec must not be null when creating equality delete writer");
+ Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+ "Partition must not be null for partitioned writes");
+
+ meta("delete-type", "equality");
+ meta("delete-field-ids", IntStream.of(equalityFieldIds)
+ .mapToObj(Objects::toString)
+ .collect(Collectors.joining(", ")));
+
+ // the appender uses the row schema without extra columns
+ appenderBuilder.schema(rowSchema);
+ appenderBuilder.createWriterFunc(createWriterFunc);
+
+ return new EqualityDeleteWriter<>(
+ appenderBuilder.build(), FileFormat.ORC, location, spec, partition, keyMetadata,
+ sortOrder, equalityFieldIds);
+ }
+
+ public <T> PositionDeleteWriter<T> buildPositionWriter() {
+ Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids");
+ Preconditions.checkArgument(spec != null, "Spec must not be null when creating position delete writer");
+ Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+ "Partition must not be null for partitioned writes");
+
+ meta("delete-type", "position");
+
+ if (rowSchema != null && createWriterFunc != null) {
+ Schema deleteSchema = DeleteSchemaUtil.posDeleteSchema(rowSchema);
+ appenderBuilder.schema(deleteSchema);
+
+ appenderBuilder.createWriterFunc((schema, typeDescription) ->
+ GenericOrcWriters.positionDelete(createWriterFunc.apply(deleteSchema, typeDescription), pathTransformFunc));
+ } else {
+ appenderBuilder.schema(DeleteSchemaUtil.pathPosSchema());
+
+ // We ignore the 'createWriterFunc' and 'rowSchema' even if is provided, since we do not write row data itself
+ appenderBuilder.createWriterFunc((schema, typeDescription) -> GenericOrcWriters.positionDelete(
+ GenericOrcWriter.buildWriter(schema, typeDescription), Function.identity()));
+ }
+
+ return new PositionDeleteWriter<>(
+ appenderBuilder.build(), FileFormat.ORC, location, spec, partition, keyMetadata);
+ }
+ }
+
public static ReadBuilder read(InputFile file) {
return new ReadBuilder(file);
}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java b/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java
index 1f0ea13..0487b9c 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.orc;
import java.io.IOException;
+import java.util.List;
import java.util.stream.Stream;
import org.apache.iceberg.FieldMetrics;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
@@ -38,6 +39,8 @@ public interface OrcRowWriter<T> {
*/
void write(T row, VectorizedRowBatch output) throws IOException;
+ List<OrcValueWriter<?>> writers();
+
/**
* Returns a stream of {@link FieldMetrics} that this OrcRowWriter keeps track of.
*/
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDeleteWriters.java b/orc/src/test/java/org/apache/iceberg/orc/TestOrcDeleteWriters.java
similarity index 75%
copy from parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDeleteWriters.java
copy to orc/src/test/java/org/apache/iceberg/orc/TestOrcDeleteWriters.java
index 77ceec1..5de06a0 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDeleteWriters.java
+++ b/orc/src/test/java/org/apache/iceberg/orc/TestOrcDeleteWriters.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iceberg.parquet;
+package org.apache.iceberg.orc;
import java.io.File;
import java.io.IOException;
@@ -31,9 +31,10 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.data.parquet.GenericParquetReaders;
-import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.OutputFile;
@@ -41,17 +42,16 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
-import org.apache.iceberg.types.Types.NestedField;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-public class TestParquetDeleteWriters {
+public class TestOrcDeleteWriters {
private static final Schema SCHEMA = new Schema(
- NestedField.required(1, "id", Types.LongType.get()),
- NestedField.optional(2, "data", Types.StringType.get()));
+ Types.NestedField.required(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "data", Types.StringType.get()));
private List<Record> records;
@@ -77,8 +77,8 @@ public class TestParquetDeleteWriters {
File deleteFile = temp.newFile();
OutputFile out = Files.localOutput(deleteFile);
- EqualityDeleteWriter<Record> deleteWriter = Parquet.writeDeletes(out)
- .createWriterFunc(GenericParquetWriter::buildWriter)
+ EqualityDeleteWriter<Record> deleteWriter = ORC.writeDeletes(out)
+ .createWriterFunc(GenericOrcWriter::buildWriter)
.overwrite()
.rowSchema(SCHEMA)
.withSpec(PartitionSpec.unpartitioned())
@@ -90,16 +90,16 @@ public class TestParquetDeleteWriters {
}
DeleteFile metadata = deleteWriter.toDeleteFile();
- Assert.assertEquals("Format should be Parquet", FileFormat.PARQUET, metadata.format());
+ Assert.assertEquals("Format should be ORC", FileFormat.ORC, metadata.format());
Assert.assertEquals("Should be equality deletes", FileContent.EQUALITY_DELETES, metadata.content());
Assert.assertEquals("Record count should be correct", records.size(), metadata.recordCount());
Assert.assertEquals("Partition should be empty", 0, metadata.partition().size());
Assert.assertNull("Key metadata should be null", metadata.keyMetadata());
List<Record> deletedRecords;
- try (CloseableIterable<Record> reader = Parquet.read(out.toInputFile())
+ try (CloseableIterable<Record> reader = ORC.read(out.toInputFile())
.project(SCHEMA)
- .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(SCHEMA, fileSchema))
+ .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(SCHEMA, fileSchema))
.build()) {
deletedRecords = Lists.newArrayList(reader);
}
@@ -114,24 +114,26 @@ public class TestParquetDeleteWriters {
Schema deleteSchema = new Schema(
MetadataColumns.DELETE_FILE_PATH,
MetadataColumns.DELETE_FILE_POS,
- NestedField.optional(MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", SCHEMA.asStruct()));
+ Types.NestedField.optional(MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", SCHEMA.asStruct()));
- String deletePath = "s3://bucket/path/file.parquet";
+ String deletePath = "s3://bucket/path/file.orc";
GenericRecord posDelete = GenericRecord.create(deleteSchema);
List<Record> expectedDeleteRecords = Lists.newArrayList();
OutputFile out = Files.localOutput(deleteFile);
- PositionDeleteWriter<Record> deleteWriter = Parquet.writeDeletes(out)
- .createWriterFunc(GenericParquetWriter::buildWriter)
+ PositionDeleteWriter<Record> deleteWriter = ORC.writeDeletes(out)
+ .createWriterFunc(GenericOrcWriter::buildWriter)
.overwrite()
.rowSchema(SCHEMA)
.withSpec(PartitionSpec.unpartitioned())
.buildPositionWriter();
+ PositionDelete<Record> positionDelete = PositionDelete.create();
try (PositionDeleteWriter<Record> writer = deleteWriter) {
for (int i = 0; i < records.size(); i += 1) {
int pos = i * 3 + 2;
- writer.delete(deletePath, pos, records.get(i));
+ positionDelete.set(deletePath, pos, records.get(i));
+ writer.write(positionDelete);
expectedDeleteRecords.add(posDelete.copy(ImmutableMap.of(
"file_path", deletePath,
"pos", (long) pos,
@@ -140,16 +142,16 @@ public class TestParquetDeleteWriters {
}
DeleteFile metadata = deleteWriter.toDeleteFile();
- Assert.assertEquals("Format should be Parquet", FileFormat.PARQUET, metadata.format());
+ Assert.assertEquals("Format should be ORC", FileFormat.ORC, metadata.format());
Assert.assertEquals("Should be position deletes", FileContent.POSITION_DELETES, metadata.content());
Assert.assertEquals("Record count should be correct", records.size(), metadata.recordCount());
Assert.assertEquals("Partition should be empty", 0, metadata.partition().size());
Assert.assertNull("Key metadata should be null", metadata.keyMetadata());
List<Record> deletedRecords;
- try (CloseableIterable<Record> reader = Parquet.read(out.toInputFile())
+ try (CloseableIterable<Record> reader = ORC.read(out.toInputFile())
.project(deleteSchema)
- .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema))
+ .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema))
.build()) {
deletedRecords = Lists.newArrayList(reader);
}
@@ -165,21 +167,26 @@ public class TestParquetDeleteWriters {
MetadataColumns.DELETE_FILE_PATH,
MetadataColumns.DELETE_FILE_POS);
- String deletePath = "s3://bucket/path/file.parquet";
+ String deletePath = "s3://bucket/path/file.orc";
GenericRecord posDelete = GenericRecord.create(deleteSchema);
List<Record> expectedDeleteRecords = Lists.newArrayList();
OutputFile out = Files.localOutput(deleteFile);
- PositionDeleteWriter<Void> deleteWriter = Parquet.writeDeletes(out)
- .createWriterFunc(GenericParquetWriter::buildWriter)
+ PositionDeleteWriter<Void> deleteWriter = ORC.writeDeletes(out)
+ .createWriterFunc(GenericOrcWriter::buildWriter)
.overwrite()
.withSpec(PartitionSpec.unpartitioned())
+ .transformPaths(path -> {
+ throw new RuntimeException("Should not be called for performance reasons");
+ })
.buildPositionWriter();
+ PositionDelete<Void> positionDelete = PositionDelete.create();
try (PositionDeleteWriter<Void> writer = deleteWriter) {
for (int i = 0; i < records.size(); i += 1) {
int pos = i * 3 + 2;
- writer.delete(deletePath, pos, null);
+ positionDelete.set(deletePath, pos, null);
+ writer.write(positionDelete);
expectedDeleteRecords.add(posDelete.copy(ImmutableMap.of(
"file_path", deletePath,
"pos", (long) pos)));
@@ -187,16 +194,16 @@ public class TestParquetDeleteWriters {
}
DeleteFile metadata = deleteWriter.toDeleteFile();
- Assert.assertEquals("Format should be Parquet", FileFormat.PARQUET, metadata.format());
+ Assert.assertEquals("Format should be ORC", FileFormat.ORC, metadata.format());
Assert.assertEquals("Should be position deletes", FileContent.POSITION_DELETES, metadata.content());
Assert.assertEquals("Record count should be correct", records.size(), metadata.recordCount());
Assert.assertEquals("Partition should be empty", 0, metadata.partition().size());
Assert.assertNull("Key metadata should be null", metadata.keyMetadata());
List<Record> deletedRecords;
- try (CloseableIterable<Record> reader = Parquet.read(out.toInputFile())
+ try (CloseableIterable<Record> reader = ORC.read(out.toInputFile())
.project(deleteSchema)
- .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema))
+ .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema))
.build()) {
deletedRecords = Lists.newArrayList(reader);
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 8d42fd5..316a363 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -568,7 +568,7 @@ public class Parquet {
}
public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws IOException {
- Preconditions.checkState(rowSchema != null, "Cannot create equality delete file without a schema`");
+ Preconditions.checkState(rowSchema != null, "Cannot create equality delete file without a schema");
Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids");
Preconditions.checkState(createWriterFunc != null,
"Cannot create equality delete file unless createWriterFunc is set");
@@ -594,8 +594,7 @@ public class Parquet {
public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids");
- Preconditions.checkArgument(spec != null,
- "Spec must not be null when creating position delete writer");
+ Preconditions.checkArgument(spec != null, "Spec must not be null when creating position delete writer");
Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
"Partition must not be null for partitioned writes");
@@ -617,6 +616,7 @@ public class Parquet {
} else {
appenderBuilder.schema(DeleteSchemaUtil.pathPosSchema());
+ // We ignore the 'createWriterFunc' and 'rowSchema' even if is provided, since we do not write row data itself
appenderBuilder.createWriterFunc(parquetSchema ->
new PositionDeleteStructWriter<T>((StructWriter<?>) GenericParquetWriter.buildWriter(parquetSchema),
Function.identity()));
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDeleteWriters.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDeleteWriters.java
index 77ceec1..9af0719 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDeleteWriters.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDeleteWriters.java
@@ -174,6 +174,9 @@ public class TestParquetDeleteWriters {
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.withSpec(PartitionSpec.unpartitioned())
+ .transformPaths(path -> {
+ throw new RuntimeException("Should not be called for performance reasons");
+ })
.buildPositionWriter();
try (PositionDeleteWriter<Void> writer = deleteWriter) {
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
index 34292f2..9c7f3a6 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
@@ -61,6 +61,11 @@ public class SparkOrcWriter implements OrcRowWriter<InternalRow> {
}
@Override
+ public List<OrcValueWriter<?>> writers() {
+ return writer.writers();
+ }
+
+ @Override
public Stream<FieldMetrics<?>> metrics() {
return writer.metrics();
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
index daff3b9..beaa7c2 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkFileWriterFactory.java
@@ -110,6 +110,17 @@ class SparkFileWriterFactory extends BaseFileWriterFactory<InternalRow> {
builder.createWriterFunc(SparkOrcWriter::new);
}
+ @Override
+ protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
+ builder.createWriterFunc(SparkOrcWriter::new);
+ }
+
+ @Override
+ protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
+ builder.createWriterFunc(SparkOrcWriter::new);
+ builder.transformPaths(path -> UTF8String.fromString(path.toString()));
+ }
+
private StructType dataSparkType() {
if (dataSparkType == null) {
Preconditions.checkNotNull(dataSchema(), "Data schema must not be null");
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
index 010f9f5..b8ee563 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
@@ -121,9 +121,9 @@ public class TestSparkOrcReadMetadataColumns {
.createWriterFunc(SparkOrcWriter::new)
.schema(DATA_SCHEMA)
// write in such a way that the file contains 10 stripes each with 100 rows
- .config("iceberg.orc.vectorbatch.size", "100")
- .config(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), "100")
- .config(OrcConf.STRIPE_SIZE.getAttribute(), "1")
+ .set("iceberg.orc.vectorbatch.size", "100")
+ .set(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), "100")
+ .set(OrcConf.STRIPE_SIZE.getAttribute(), "1")
.build()) {
writer.addAll(DATA_ROWS);
}