You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2022/03/28 03:55:31 UTC
[iceberg] branch master updated: ORC: Support estimated length for unclosed file. (#3784)
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 6d39f3c ORC: Support estimated length for unclosed file. (#3784)
6d39f3c is described below
commit 6d39f3c5d878201ab4a3c073e4655f9798b18962
Author: liliwei <hi...@gmail.com>
AuthorDate: Mon Mar 28 11:55:15 2022 +0800
ORC: Support estimated length for unclosed file. (#3784)
---
build.gradle | 1 +
.../java/org/apache/iceberg/io/BaseTaskWriter.java | 4 +-
.../org/apache/iceberg/io/ClusteredDataWriter.java | 9 +-
.../iceberg/io/ClusteredEqualityDeleteWriter.java | 9 +-
.../iceberg/io/ClusteredPositionDeleteWriter.java | 9 +-
.../org/apache/iceberg/io/FanoutDataWriter.java | 9 +-
.../org/apache/iceberg/io/TestBaseTaskWriter.java | 1 +
.../apache/iceberg/io/TestRollingFileWriters.java | 7 +-
.../flink/actions/TestRewriteDataFilesAction.java | 5 +-
.../flink/sink/TestIcebergStreamWriter.java | 4 -
.../apache/iceberg/flink/sink/TestTaskWriters.java | 4 -
.../iceberg/orc/EstimateOrcAvgWidthVisitor.java | 75 ++++++++
.../org/apache/iceberg/orc/OrcFileAppender.java | 54 +++++-
.../orc/TestEstimateOrcAvgWidthVisitor.java | 202 +++++++++++++++++++++
.../apache/iceberg/spark/source/SparkWrite.java | 9 +-
.../iceberg/spark/source/TestSparkDataWrite.java | 16 +-
16 files changed, 345 insertions(+), 73 deletions(-)
diff --git a/build.gradle b/build.gradle
index 63d257f..8fe5ebc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -457,6 +457,7 @@ project(':iceberg-orc') {
dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
api project(':iceberg-api')
+ implementation project(':iceberg-common')
implementation project(':iceberg-core')
implementation("org.apache.avro:avro") {
exclude group: 'org.tukaani' // xz compression is not supported
diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
index 0d927fb..386405a 100644
--- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
@@ -280,9 +280,7 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
}
private boolean shouldRollToNewFile() {
- // TODO: ORC file now not support target file size before closed
- return !format.equals(FileFormat.ORC) &&
- currentRows % ROWS_DIVISOR == 0 && length(currentWriter) >= targetFileSize;
+ return currentRows % ROWS_DIVISOR == 0 && length(currentWriter) >= targetFileSize;
}
private void closeCurrent() throws IOException {
diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java
index a6982cd..a4acd2f 100644
--- a/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java
@@ -24,7 +24,6 @@ import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
-import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
/**
@@ -52,13 +51,7 @@ public class ClusteredDataWriter<T> extends ClusteredWriter<T, DataWriteResult>
@Override
protected FileWriter<T, DataWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
- // TODO: support ORC rolling writers
- if (fileFormat == FileFormat.ORC) {
- EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
- return writerFactory.newDataWriter(outputFile, spec, partition);
- } else {
- return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
- }
+ return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java
index 385d1a5..976165f 100644
--- a/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java
@@ -24,7 +24,6 @@ import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
-import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -53,13 +52,7 @@ public class ClusteredEqualityDeleteWriter<T> extends ClusteredWriter<T, DeleteW
@Override
protected FileWriter<T, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
- // TODO: support ORC rolling writers
- if (fileFormat == FileFormat.ORC) {
- EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
- return writerFactory.newEqualityDeleteWriter(outputFile, spec, partition);
- } else {
- return new RollingEqualityDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
- }
+ return new RollingEqualityDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java
index ea11838..53336ff 100644
--- a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java
@@ -25,7 +25,6 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.PositionDelete;
-import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.CharSequenceSet;
@@ -56,13 +55,7 @@ public class ClusteredPositionDeleteWriter<T> extends ClusteredWriter<PositionDe
@Override
protected FileWriter<PositionDelete<T>, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
- // TODO: support ORC rolling writers
- if (fileFormat == FileFormat.ORC) {
- EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
- return writerFactory.newPositionDeleteWriter(outputFile, spec, partition);
- } else {
- return new RollingPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
- }
+ return new RollingPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java
index d6e16a7..54ccff0 100644
--- a/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java
@@ -24,7 +24,6 @@ import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
-import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
/**
@@ -52,13 +51,7 @@ public class FanoutDataWriter<T> extends FanoutWriter<T, DataWriteResult> {
@Override
protected FileWriter<T, DataWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
- // TODO: support ORC rolling writers
- if (fileFormat == FileFormat.ORC) {
- EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
- return writerFactory.newDataWriter(outputFile, spec, partition);
- } else {
- return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
- }
+ return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
}
@Override
diff --git a/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java b/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java
index 39771fa..9bb7627 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java
@@ -58,6 +58,7 @@ public class TestBaseTaskWriter extends TableTestBase {
public static Object[][] parameters() {
return new Object[][] {
{"avro"},
+ {"orc"},
{"parquet"}
};
}
diff --git a/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java b/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java
index efa756f..5c11d34 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java
@@ -30,7 +30,6 @@ import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.Assert;
-import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -39,8 +38,6 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public abstract class TestRollingFileWriters<T> extends WriterTestBase<T> {
- // TODO: add ORC once we support ORC rolling file writers
-
@Parameterized.Parameters(name = "FileFormat={0}, Partitioned={1}")
public static Object[] parameters() {
return new Object[][] {
@@ -48,6 +45,8 @@ public abstract class TestRollingFileWriters<T> extends WriterTestBase<T> {
new Object[]{FileFormat.AVRO, true},
new Object[]{FileFormat.PARQUET, false},
new Object[]{FileFormat.PARQUET, true},
+ new Object[]{FileFormat.ORC, false},
+ new Object[]{FileFormat.ORC, true}
};
}
@@ -129,8 +128,6 @@ public abstract class TestRollingFileWriters<T> extends WriterTestBase<T> {
@Test
public void testRollingEqualityDeleteWriterNoRecords() throws IOException {
- Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
-
List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
Schema equalityDeleteRowSchema = table.schema().select("id");
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
index 397bd6e..03cdcd8 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
@@ -23,6 +23,7 @@ package org.apache.iceberg.flink.actions;
import java.io.File;
import java.io.IOException;
import java.util.List;
+import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.CoreOptions;
@@ -50,7 +51,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.junit.After;
import org.junit.Assert;
-import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -322,7 +322,6 @@ public class TestRewriteDataFilesAction extends FlinkCatalogTestBase {
*/
@Test
public void testRewriteAvoidRepeateCompress() throws IOException {
- Assume.assumeFalse("ORC does not support getting length when file is opening", format.equals(FileFormat.ORC));
List<Record> expected = Lists.newArrayList();
Schema schema = icebergTableUnPartitioned.schema();
GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(schema);
@@ -331,7 +330,7 @@ public class TestRewriteDataFilesAction extends FlinkCatalogTestBase {
try (FileAppender<Record> fileAppender = genericAppenderFactory.newAppender(Files.localOutput(file), format)) {
long filesize = 20000;
for (; fileAppender.length() < filesize; count++) {
- Record record = SimpleDataUtil.createRecord(count, "iceberg");
+ Record record = SimpleDataUtil.createRecord(count, UUID.randomUUID().toString());
fileAppender.add(record);
expected.add(record);
}
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
index 7419775..86c2f66 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
@@ -233,10 +233,6 @@ public class TestIcebergStreamWriter {
@Test
public void testTableWithTargetFileSize() throws Exception {
- // TODO: ORC file does not support target file size before closed.
- if (format == FileFormat.ORC) {
- return;
- }
// Adjust the target-file-size in table properties.
table.updateProperties()
.set(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, "4") // ~4 bytes; low enough to trigger
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
index 562a75e..2595b09 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
@@ -183,10 +183,6 @@ public class TestTaskWriters {
@Test
public void testRollingWithTargetFileSize() throws IOException {
- // TODO ORC don't support target file size before closed.
- if (format == FileFormat.ORC) {
- return;
- }
try (TaskWriter<RowData> taskWriter = createTaskWriter(4)) {
List<RowData> rows = Lists.newArrayListWithCapacity(8000);
List<Record> records = Lists.newArrayListWithCapacity(8000);
diff --git a/orc/src/main/java/org/apache/iceberg/orc/EstimateOrcAvgWidthVisitor.java b/orc/src/main/java/org/apache/iceberg/orc/EstimateOrcAvgWidthVisitor.java
new file mode 100644
index 0000000..af0c3b6
--- /dev/null
+++ b/orc/src/main/java/org/apache/iceberg/orc/EstimateOrcAvgWidthVisitor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.orc;
+
+import java.util.List;
+import java.util.Optional;
+import org.apache.orc.TypeDescription;
+
+public class EstimateOrcAvgWidthVisitor extends OrcSchemaVisitor<Integer> {
+
+ @Override
+ public Integer record(TypeDescription record, List<String> names, List<Integer> fieldWidths) {
+ return fieldWidths.stream().reduce(Integer::sum).orElse(0);
+ }
+
+ @Override
+ public Integer list(TypeDescription array, Integer elementWidth) {
+ return elementWidth;
+ }
+
+ @Override
+ public Integer map(TypeDescription map, Integer keyWidth, Integer valueWidth) {
+ return keyWidth + valueWidth;
+ }
+
+ @Override
+ public Integer primitive(TypeDescription primitive) {
+ Optional<Integer> icebergIdOpt = ORCSchemaUtil.icebergID(primitive);
+
+ if (!icebergIdOpt.isPresent()) {
+ return 0;
+ }
+
+ switch (primitive.getCategory()) {
+ case BYTE:
+ case CHAR:
+ case SHORT:
+ case INT:
+ case FLOAT:
+ case BOOLEAN:
+ case LONG:
+ case DOUBLE:
+ case DATE:
+ return 8;
+ case TIMESTAMP:
+ case TIMESTAMP_INSTANT:
+ return 12;
+ case STRING:
+ case VARCHAR:
+ case BINARY:
+ return 128;
+ case DECIMAL:
+ return primitive.getPrecision() + 2;
+ default:
+ throw new IllegalArgumentException("Can't handle " + primitive);
+ }
+ }
+}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
index e946cda..8261839 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.orc;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
@@ -30,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.common.DynFields;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.hadoop.HadoopOutputFile;
import org.apache.iceberg.io.FileAppender;
@@ -41,16 +43,23 @@ import org.apache.orc.Reader;
import org.apache.orc.StripeInformation;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
+import org.apache.orc.impl.writer.TreeWriter;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Create a file appender for ORC.
*/
class OrcFileAppender<D> implements FileAppender<D> {
+ private static final Logger LOG = LoggerFactory.getLogger(OrcFileAppender.class);
+
private final int batchSize;
private final OutputFile file;
private final Writer writer;
+ private final TreeWriter treeWriter;
private final VectorizedRowBatch batch;
+ private final int avgRowByteSize;
private final OrcRowWriter<D> valueWriter;
private boolean isClosed = false;
private final Configuration conf;
@@ -66,6 +75,14 @@ class OrcFileAppender<D> implements FileAppender<D> {
this.metricsConfig = metricsConfig;
TypeDescription orcSchema = ORCSchemaUtil.convert(schema);
+
+ this.avgRowByteSize =
+ OrcSchemaVisitor.visitSchema(orcSchema, new EstimateOrcAvgWidthVisitor()).stream().reduce(Integer::sum)
+ .orElse(0);
+ if (avgRowByteSize == 0) {
+ LOG.warn("The average length of the rows appears to be zero.");
+ }
+
this.batch = orcSchema.createRowBatch(this.batchSize);
OrcFile.WriterOptions options = OrcFile.writerOptions(conf).useUTCTimestamp(true);
@@ -74,6 +91,9 @@ class OrcFileAppender<D> implements FileAppender<D> {
}
options.setSchema(orcSchema);
this.writer = newOrcWriter(file, options, metadata);
+
+ // TODO: Turn to access the estimateMemorySize directly after ORC 1.7.4 released with https://github.com/apache/orc/pull/1057.
+ this.treeWriter = treeWriterHiddenInORC();
this.valueWriter = newOrcRowWriter(schema, orcSchema, createWriterFunc);
}
@@ -86,7 +106,7 @@ class OrcFileAppender<D> implements FileAppender<D> {
batch.reset();
}
} catch (IOException ioe) {
- throw new RuntimeIOException(ioe, "Problem writing to ORC file %s", file.location());
+ throw new UncheckedIOException(String.format("Problem writing to ORC file %s", file.location()), ioe);
}
}
@@ -99,9 +119,29 @@ class OrcFileAppender<D> implements FileAppender<D> {
@Override
public long length() {
- Preconditions.checkState(isClosed,
- "Cannot return length while appending to an open file.");
- return file.toInputFile().getLength();
+ if (isClosed) {
+ return file.toInputFile().getLength();
+ }
+
+ Preconditions.checkNotNull(treeWriter,
+ "Cannot estimate length of file being written as the ORC writer's internal writer is not present");
+
+ long estimateMemory = treeWriter.estimateMemory();
+
+ long dataLength = 0;
+ try {
+ List<StripeInformation> stripes = writer.getStripes();
+ if (!stripes.isEmpty()) {
+ StripeInformation stripeInformation = stripes.get(stripes.size() - 1);
+ dataLength = stripeInformation != null ? stripeInformation.getOffset() + stripeInformation.getLength() : 0;
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(String.format("Can't get Stripe's length from the file writer with path: %s.",
+ file.location()), e);
+ }
+
+ // This value is estimated, not actual.
+ return (long) Math.ceil(dataLength + (estimateMemory + (long) batch.size * avgRowByteSize) * 0.2);
}
@Override
@@ -153,4 +193,10 @@ class OrcFileAppender<D> implements FileAppender<D> {
createWriterFunc) {
return (OrcRowWriter<D>) createWriterFunc.apply(schema, orcSchema);
}
+
+ private TreeWriter treeWriterHiddenInORC() {
+ DynFields.BoundField<TreeWriter> treeWriterFiled =
+ DynFields.builder().hiddenImpl(writer.getClass(), "treeWriter").build(writer);
+ return treeWriterFiled.get();
+ }
}
diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestEstimateOrcAvgWidthVisitor.java b/orc/src/test/java/org/apache/iceberg/orc/TestEstimateOrcAvgWidthVisitor.java
new file mode 100644
index 0000000..aca95ef
--- /dev/null
+++ b/orc/src/test/java/org/apache/iceberg/orc/TestEstimateOrcAvgWidthVisitor.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.orc;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestEstimateOrcAvgWidthVisitor {
+
+ // all supported fields
+ protected static final Types.NestedField ID_FIELD = required(1, "id", Types.IntegerType.get());
+ protected static final Types.NestedField DATA_FIELD = optional(2, "data", Types.StringType.get());
+ protected static final Types.NestedField FLOAT_FIELD = required(3, "float", Types.FloatType.get());
+ protected static final Types.NestedField DOUBLE_FIELD = optional(4, "double", Types.DoubleType.get());
+ protected static final Types.NestedField DECIMAL_FIELD = optional(5, "decimal", Types.DecimalType.of(5, 3));
+ protected static final Types.NestedField FIXED_FIELD = optional(7, "fixed", Types.FixedType.ofLength(4));
+ protected static final Types.NestedField BINARY_FIELD = optional(8, "binary", Types.BinaryType.get());
+ protected static final Types.NestedField FLOAT_LIST_FIELD = optional(9, "floatList",
+ Types.ListType.ofRequired(10, Types.FloatType.get()));
+ protected static final Types.NestedField LONG_FIELD = optional(11, "long", Types.LongType.get());
+ protected static final Types.NestedField BOOLEAN_FIELD = optional(12, "boolean", Types.BooleanType.get());
+ protected static final Types.NestedField TIMESTAMP_ZONE_FIELD = optional(13, "timestampZone",
+ Types.TimestampType.withZone());
+ protected static final Types.NestedField TIMESTAMP_FIELD = optional(14, "timestamp",
+ Types.TimestampType.withoutZone());
+ protected static final Types.NestedField DATE_FIELD = optional(15, "date", Types.DateType.get());
+ protected static final Types.NestedField UUID_FIELD = required(16, "uuid", Types.UUIDType.get());
+
+ protected static final Types.NestedField MAP_FIELD_1 = optional(17, "map1",
+ Types.MapType.ofOptional(18, 19, Types.FloatType.get(), Types.StringType.get())
+ );
+ protected static final Types.NestedField MAP_FIELD_2 = optional(20, "map2",
+ Types.MapType.ofOptional(21, 22, Types.IntegerType.get(), Types.DoubleType.get())
+ );
+ protected static final Types.NestedField STRUCT_FIELD = optional(23, "struct", Types.StructType.of(
+ required(24, "booleanField", Types.BooleanType.get()),
+ optional(25, "date", Types.DateType.get()),
+ optional(27, "timestamp", Types.TimestampType.withZone())
+ ));
+
+ @Test
+ public void testEstimateIntegerWidth() {
+ Schema integerSchema = new Schema(ID_FIELD);
+ TypeDescription integerOrcSchema = ORCSchemaUtil.convert(integerSchema);
+ long estimateLength = getEstimateLength(integerOrcSchema);
+ Assert.assertEquals("Estimated average length of integer must be 8.", 8, estimateLength);
+ }
+
+ @Test
+ public void testEstimateStringWidth() {
+ Schema stringSchema = new Schema(DATA_FIELD);
+ TypeDescription stringOrcSchema = ORCSchemaUtil.convert(stringSchema);
+ long estimateLength = getEstimateLength(stringOrcSchema);
+ Assert.assertEquals("Estimated average length of string must be 128.", 128, estimateLength);
+ }
+
+ @Test
+ public void testEstimateFloatWidth() {
+ Schema floatSchema = new Schema(FLOAT_FIELD);
+ TypeDescription floatOrcSchema = ORCSchemaUtil.convert(floatSchema);
+ long estimateLength = getEstimateLength(floatOrcSchema);
+ Assert.assertEquals("Estimated average length of float must be 8.", 8, estimateLength);
+ }
+
+ @Test
+ public void testEstimateDoubleWidth() {
+ Schema doubleSchema = new Schema(DOUBLE_FIELD);
+ TypeDescription doubleOrcSchema = ORCSchemaUtil.convert(doubleSchema);
+ long estimateLength = getEstimateLength(doubleOrcSchema);
+ Assert.assertEquals("Estimated average length of double must be 8.", 8, estimateLength);
+ }
+
+ @Test
+ public void testEstimateDecimalWidth() {
+ Schema decimalSchema = new Schema(DECIMAL_FIELD);
+ TypeDescription decimalOrcSchema = ORCSchemaUtil.convert(decimalSchema);
+ long estimateLength = getEstimateLength(decimalOrcSchema);
+ Assert.assertEquals("Estimated average length of decimal must be 7.", 7, estimateLength);
+ }
+
+ @Test
+ public void testEstimateFixedWidth() {
+ Schema fixedSchema = new Schema(FIXED_FIELD);
+ TypeDescription fixedOrcSchema = ORCSchemaUtil.convert(fixedSchema);
+ long estimateLength = getEstimateLength(fixedOrcSchema);
+ Assert.assertEquals("Estimated average length of fixed must be 128.", 128, estimateLength);
+ }
+
+ @Test
+ public void testEstimateBinaryWidth() {
+ Schema binarySchema = new Schema(BINARY_FIELD);
+ TypeDescription binaryOrcSchema = ORCSchemaUtil.convert(binarySchema);
+ long estimateLength = getEstimateLength(binaryOrcSchema);
+ Assert.assertEquals("Estimated average length of binary must be 128.", 128, estimateLength);
+ }
+
+ @Test
+ public void testEstimateListWidth() {
+ Schema listSchema = new Schema(FLOAT_LIST_FIELD);
+ TypeDescription listOrcSchema = ORCSchemaUtil.convert(listSchema);
+ long estimateLength = getEstimateLength(listOrcSchema);
+ Assert.assertEquals("Estimated average length of list must be 8.", 8, estimateLength);
+ }
+
+ @Test
+ public void testEstimateLongWidth() {
+ Schema longSchema = new Schema(LONG_FIELD);
+ TypeDescription longOrcSchema = ORCSchemaUtil.convert(longSchema);
+ long estimateLength = getEstimateLength(longOrcSchema);
+ Assert.assertEquals("Estimated average length of long must be 8.", 8, estimateLength);
+ }
+
+ @Test
+ public void testEstimateBooleanWidth() {
+ Schema booleanSchema = new Schema(BOOLEAN_FIELD);
+ TypeDescription booleanOrcSchema = ORCSchemaUtil.convert(booleanSchema);
+ long estimateLength = getEstimateLength(booleanOrcSchema);
+ Assert.assertEquals("Estimated average length of boolean must be 8.", 8, estimateLength);
+ }
+
+ @Test
+ public void testEstimateTimestampWidth() {
+ Schema timestampZoneSchema = new Schema(TIMESTAMP_ZONE_FIELD);
+ TypeDescription timestampZoneOrcSchema = ORCSchemaUtil.convert(timestampZoneSchema);
+ long estimateLength = getEstimateLength(timestampZoneOrcSchema);
+ Assert.assertEquals("Estimated average length of timestamps with zone must be 12.", 12, estimateLength);
+
+ Schema timestampSchema = new Schema(TIMESTAMP_FIELD);
+ TypeDescription timestampOrcSchema = ORCSchemaUtil.convert(timestampSchema);
+ estimateLength = getEstimateLength(timestampOrcSchema);
+ Assert.assertEquals("Estimated average length of timestamp must be 12.", 12, estimateLength);
+ }
+
+ @Test
+ public void testEstimateDateWidth() {
+ Schema dateSchema = new Schema(DATE_FIELD);
+ TypeDescription dateOrcSchema = ORCSchemaUtil.convert(dateSchema);
+ long estimateLength = getEstimateLength(dateOrcSchema);
+ Assert.assertEquals("Estimated average length of date must be 8.", 8, estimateLength);
+ }
+
+ @Test
+ public void testEstimateUUIDWidth() {
+ Schema uuidSchema = new Schema(UUID_FIELD);
+ TypeDescription uuidOrcSchema = ORCSchemaUtil.convert(uuidSchema);
+ long estimateLength = getEstimateLength(uuidOrcSchema);
+ Assert.assertEquals("Estimated average length of uuid must be 128.", 128, estimateLength);
+ }
+
+ @Test
+ public void testEstimateMapWidth() {
+ Schema mapSchema = new Schema(MAP_FIELD_1);
+ TypeDescription mapOrcSchema = ORCSchemaUtil.convert(mapSchema);
+ long estimateLength = getEstimateLength(mapOrcSchema);
+ Assert.assertEquals("Estimated average length of map must be 136.", 136, estimateLength);
+ }
+
+ @Test
+ public void testEstimateStructWidth() {
+ Schema structSchema = new Schema(STRUCT_FIELD);
+ TypeDescription structOrcSchema = ORCSchemaUtil.convert(structSchema);
+ long estimateLength = getEstimateLength(structOrcSchema);
+ Assert.assertEquals("Estimated average length of struct must be 28.", 28, estimateLength);
+ }
+
+ @Test
+ public void testEstimateFullWidth() {
+ Schema fullSchema = new Schema(ID_FIELD, DATA_FIELD, FLOAT_FIELD, DOUBLE_FIELD, DECIMAL_FIELD, FIXED_FIELD,
+ BINARY_FIELD, FLOAT_LIST_FIELD, LONG_FIELD, MAP_FIELD_1, MAP_FIELD_2, STRUCT_FIELD);
+ TypeDescription fullOrcSchema = ORCSchemaUtil.convert(fullSchema);
+ long estimateLength = getEstimateLength(fullOrcSchema);
+ Assert.assertEquals("Estimated average length of the row must be 611.", 611, estimateLength);
+ }
+
+ private Integer getEstimateLength(TypeDescription orcSchemaWithDate) {
+ return OrcSchemaVisitor.visitSchema(orcSchemaWithDate, new EstimateOrcAvgWidthVisitor())
+ .stream().reduce(Integer::sum).orElse(0);
+ }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index a669622..94f4614 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -42,7 +42,6 @@ import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
-import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.ClusteredDataWriter;
@@ -636,13 +635,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
private UnpartitionedDataWriter(SparkFileWriterFactory writerFactory, OutputFileFactory fileFactory,
FileIO io, PartitionSpec spec, FileFormat format, long targetFileSize) {
- // TODO: support ORC rolling writers
- if (format == FileFormat.ORC) {
- EncryptedOutputFile outputFile = fileFactory.newOutputFile();
- delegate = writerFactory.newDataWriter(outputFile, spec, null);
- } else {
- delegate = new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSize, spec, null);
- }
+ delegate = new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSize, spec, null);
this.io = io;
}
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 5e99dac..f924175 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -365,11 +365,9 @@ public class TestSparkDataWrite {
files.add(file);
}
}
- // TODO: ORC file now not support target file size
- if (!format.equals(FileFormat.ORC)) {
- Assert.assertEquals("Should have 4 DataFiles", 4, files.size());
- Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
- }
+
+ Assert.assertEquals("Should have 4 DataFiles", 4, files.size());
+ Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
}
@Test
@@ -585,11 +583,9 @@ public class TestSparkDataWrite {
files.add(file);
}
}
- // TODO: ORC file now not support target file size
- if (!format.equals(FileFormat.ORC)) {
- Assert.assertEquals("Should have 8 DataFiles", 8, files.size());
- Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
- }
+
+ Assert.assertEquals("Should have 8 DataFiles", 8, files.size());
+ Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
}
public enum IcebergOptionsType {