You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2022/03/28 03:55:31 UTC

[iceberg] branch master updated: ORC: Support estimated length for unclosed file. (#3784)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d39f3c  ORC: Support estimated length for unclosed file. (#3784)
6d39f3c is described below

commit 6d39f3c5d878201ab4a3c073e4655f9798b18962
Author: liliwei <hi...@gmail.com>
AuthorDate: Mon Mar 28 11:55:15 2022 +0800

    ORC: Support estimated length for unclosed file. (#3784)
---
 build.gradle                                       |   1 +
 .../java/org/apache/iceberg/io/BaseTaskWriter.java |   4 +-
 .../org/apache/iceberg/io/ClusteredDataWriter.java |   9 +-
 .../iceberg/io/ClusteredEqualityDeleteWriter.java  |   9 +-
 .../iceberg/io/ClusteredPositionDeleteWriter.java  |   9 +-
 .../org/apache/iceberg/io/FanoutDataWriter.java    |   9 +-
 .../org/apache/iceberg/io/TestBaseTaskWriter.java  |   1 +
 .../apache/iceberg/io/TestRollingFileWriters.java  |   7 +-
 .../flink/actions/TestRewriteDataFilesAction.java  |   5 +-
 .../flink/sink/TestIcebergStreamWriter.java        |   4 -
 .../apache/iceberg/flink/sink/TestTaskWriters.java |   4 -
 .../iceberg/orc/EstimateOrcAvgWidthVisitor.java    |  75 ++++++++
 .../org/apache/iceberg/orc/OrcFileAppender.java    |  54 +++++-
 .../orc/TestEstimateOrcAvgWidthVisitor.java        | 202 +++++++++++++++++++++
 .../apache/iceberg/spark/source/SparkWrite.java    |   9 +-
 .../iceberg/spark/source/TestSparkDataWrite.java   |  16 +-
 16 files changed, 345 insertions(+), 73 deletions(-)

diff --git a/build.gradle b/build.gradle
index 63d257f..8fe5ebc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -457,6 +457,7 @@ project(':iceberg-orc') {
   dependencies {
     implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
     api project(':iceberg-api')
+    implementation project(':iceberg-common')
     implementation project(':iceberg-core')
     implementation("org.apache.avro:avro") {
       exclude group: 'org.tukaani' // xz compression is not supported
diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
index 0d927fb..386405a 100644
--- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
@@ -280,9 +280,7 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
     }
 
     private boolean shouldRollToNewFile() {
-      // TODO: ORC file now not support target file size before closed
-      return !format.equals(FileFormat.ORC) &&
-          currentRows % ROWS_DIVISOR == 0 && length(currentWriter) >= targetFileSize;
+      return currentRows % ROWS_DIVISOR == 0 && length(currentWriter) >= targetFileSize;
     }
 
     private void closeCurrent() throws IOException {
diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java
index a6982cd..a4acd2f 100644
--- a/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java
@@ -24,7 +24,6 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.StructLike;
-import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
 /**
@@ -52,13 +51,7 @@ public class ClusteredDataWriter<T> extends ClusteredWriter<T, DataWriteResult>
 
   @Override
   protected FileWriter<T, DataWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
-    // TODO: support ORC rolling writers
-    if (fileFormat == FileFormat.ORC) {
-      EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
-      return writerFactory.newDataWriter(outputFile, spec, partition);
-    } else {
-      return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
-    }
+    return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java
index 385d1a5..976165f 100644
--- a/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java
@@ -24,7 +24,6 @@ import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.StructLike;
-import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
@@ -53,13 +52,7 @@ public class ClusteredEqualityDeleteWriter<T> extends ClusteredWriter<T, DeleteW
 
   @Override
   protected FileWriter<T, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
-    // TODO: support ORC rolling writers
-    if (fileFormat == FileFormat.ORC) {
-      EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
-      return writerFactory.newEqualityDeleteWriter(outputFile, spec, partition);
-    } else {
-      return new RollingEqualityDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
-    }
+    return new RollingEqualityDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java
index ea11838..53336ff 100644
--- a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java
@@ -25,7 +25,6 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.deletes.PositionDelete;
-import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.util.CharSequenceSet;
 
@@ -56,13 +55,7 @@ public class ClusteredPositionDeleteWriter<T> extends ClusteredWriter<PositionDe
 
   @Override
   protected FileWriter<PositionDelete<T>, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
-    // TODO: support ORC rolling writers
-    if (fileFormat == FileFormat.ORC) {
-      EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
-      return writerFactory.newPositionDeleteWriter(outputFile, spec, partition);
-    } else {
-      return new RollingPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
-    }
+    return new RollingPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java
index d6e16a7..54ccff0 100644
--- a/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java
@@ -24,7 +24,6 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.StructLike;
-import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 
 /**
@@ -52,13 +51,7 @@ public class FanoutDataWriter<T> extends FanoutWriter<T, DataWriteResult> {
 
   @Override
   protected FileWriter<T, DataWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
-    // TODO: support ORC rolling writers
-    if (fileFormat == FileFormat.ORC) {
-      EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
-      return writerFactory.newDataWriter(outputFile, spec, partition);
-    } else {
-      return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
-    }
+    return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
   }
 
   @Override
diff --git a/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java b/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java
index 39771fa..9bb7627 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestBaseTaskWriter.java
@@ -58,6 +58,7 @@ public class TestBaseTaskWriter extends TableTestBase {
   public static Object[][] parameters() {
     return new Object[][] {
         {"avro"},
+        {"orc"},
         {"parquet"}
     };
   }
diff --git a/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java b/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java
index efa756f..5c11d34 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java
@@ -30,7 +30,6 @@ import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.Assert;
-import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -39,8 +38,6 @@ import org.junit.runners.Parameterized;
 @RunWith(Parameterized.class)
 public abstract class TestRollingFileWriters<T> extends WriterTestBase<T> {
 
-  // TODO: add ORC once we support ORC rolling file writers
-
   @Parameterized.Parameters(name = "FileFormat={0}, Partitioned={1}")
   public static Object[] parameters() {
     return new Object[][] {
@@ -48,6 +45,8 @@ public abstract class TestRollingFileWriters<T> extends WriterTestBase<T> {
         new Object[]{FileFormat.AVRO, true},
         new Object[]{FileFormat.PARQUET, false},
         new Object[]{FileFormat.PARQUET, true},
+        new Object[]{FileFormat.ORC, false},
+        new Object[]{FileFormat.ORC, true}
     };
   }
 
@@ -129,8 +128,6 @@ public abstract class TestRollingFileWriters<T> extends WriterTestBase<T> {
 
   @Test
   public void testRollingEqualityDeleteWriterNoRecords() throws IOException {
-    Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
-
     List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
     Schema equalityDeleteRowSchema = table.schema().select("id");
     FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
index 397bd6e..03cdcd8 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
@@ -23,6 +23,7 @@ package org.apache.iceberg.flink.actions;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import java.util.UUID;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.CoreOptions;
@@ -50,7 +51,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
 import org.junit.After;
 import org.junit.Assert;
-import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -322,7 +322,6 @@ public class TestRewriteDataFilesAction extends FlinkCatalogTestBase {
    */
   @Test
   public void testRewriteAvoidRepeateCompress() throws IOException {
-    Assume.assumeFalse("ORC does not support getting length when file is opening", format.equals(FileFormat.ORC));
     List<Record> expected = Lists.newArrayList();
     Schema schema = icebergTableUnPartitioned.schema();
     GenericAppenderFactory genericAppenderFactory = new GenericAppenderFactory(schema);
@@ -331,7 +330,7 @@ public class TestRewriteDataFilesAction extends FlinkCatalogTestBase {
     try (FileAppender<Record> fileAppender = genericAppenderFactory.newAppender(Files.localOutput(file), format)) {
       long filesize = 20000;
       for (; fileAppender.length() < filesize; count++) {
-        Record record = SimpleDataUtil.createRecord(count, "iceberg");
+        Record record = SimpleDataUtil.createRecord(count, UUID.randomUUID().toString());
         fileAppender.add(record);
         expected.add(record);
       }
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
index 7419775..86c2f66 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
@@ -233,10 +233,6 @@ public class TestIcebergStreamWriter {
 
   @Test
   public void testTableWithTargetFileSize() throws Exception {
-    // TODO: ORC file does not support target file size before closed.
-    if (format == FileFormat.ORC) {
-      return;
-    }
     // Adjust the target-file-size in table properties.
     table.updateProperties()
         .set(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, "4") // ~4 bytes; low enough to trigger
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
index 562a75e..2595b09 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
@@ -183,10 +183,6 @@ public class TestTaskWriters {
 
   @Test
   public void testRollingWithTargetFileSize() throws IOException {
-    // TODO ORC don't support target file size before closed.
-    if (format == FileFormat.ORC) {
-      return;
-    }
     try (TaskWriter<RowData> taskWriter = createTaskWriter(4)) {
       List<RowData> rows = Lists.newArrayListWithCapacity(8000);
       List<Record> records = Lists.newArrayListWithCapacity(8000);
diff --git a/orc/src/main/java/org/apache/iceberg/orc/EstimateOrcAvgWidthVisitor.java b/orc/src/main/java/org/apache/iceberg/orc/EstimateOrcAvgWidthVisitor.java
new file mode 100644
index 0000000..af0c3b6
--- /dev/null
+++ b/orc/src/main/java/org/apache/iceberg/orc/EstimateOrcAvgWidthVisitor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.orc;
+
+import java.util.List;
+import java.util.Optional;
+import org.apache.orc.TypeDescription;
+
+public class EstimateOrcAvgWidthVisitor extends OrcSchemaVisitor<Integer> {
+
+  @Override
+  public Integer record(TypeDescription record, List<String> names, List<Integer> fieldWidths) {
+    return fieldWidths.stream().reduce(Integer::sum).orElse(0);
+  }
+
+  @Override
+  public Integer list(TypeDescription array, Integer elementWidth) {
+    return elementWidth;
+  }
+
+  @Override
+  public Integer map(TypeDescription map, Integer keyWidth, Integer valueWidth) {
+    return keyWidth + valueWidth;
+  }
+
+  @Override
+  public Integer primitive(TypeDescription primitive) {
+    Optional<Integer> icebergIdOpt = ORCSchemaUtil.icebergID(primitive);
+
+    if (!icebergIdOpt.isPresent()) {
+      return 0;
+    }
+
+    switch (primitive.getCategory()) {
+      case BYTE:
+      case CHAR:
+      case SHORT:
+      case INT:
+      case FLOAT:
+      case BOOLEAN:
+      case LONG:
+      case DOUBLE:
+      case DATE:
+        return 8;
+      case TIMESTAMP:
+      case TIMESTAMP_INSTANT:
+        return 12;
+      case STRING:
+      case VARCHAR:
+      case BINARY:
+        return 128;
+      case DECIMAL:
+        return primitive.getPrecision() + 2;
+      default:
+        throw new IllegalArgumentException("Can't handle " + primitive);
+    }
+  }
+}
diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
index e946cda..8261839 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.orc;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
@@ -30,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.Metrics;
 import org.apache.iceberg.MetricsConfig;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.common.DynFields;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.hadoop.HadoopOutputFile;
 import org.apache.iceberg.io.FileAppender;
@@ -41,16 +43,23 @@ import org.apache.orc.Reader;
 import org.apache.orc.StripeInformation;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
+import org.apache.orc.impl.writer.TreeWriter;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Create a file appender for ORC.
  */
 class OrcFileAppender<D> implements FileAppender<D> {
+  private static final Logger LOG = LoggerFactory.getLogger(OrcFileAppender.class);
+
   private final int batchSize;
   private final OutputFile file;
   private final Writer writer;
+  private final TreeWriter treeWriter;
   private final VectorizedRowBatch batch;
+  private final int avgRowByteSize;
   private final OrcRowWriter<D> valueWriter;
   private boolean isClosed = false;
   private final Configuration conf;
@@ -66,6 +75,14 @@ class OrcFileAppender<D> implements FileAppender<D> {
     this.metricsConfig = metricsConfig;
 
     TypeDescription orcSchema = ORCSchemaUtil.convert(schema);
+
+    this.avgRowByteSize =
+        OrcSchemaVisitor.visitSchema(orcSchema, new EstimateOrcAvgWidthVisitor()).stream().reduce(Integer::sum)
+            .orElse(0);
+    if (avgRowByteSize == 0) {
+      LOG.warn("The average length of the rows appears to be zero.");
+    }
+
     this.batch = orcSchema.createRowBatch(this.batchSize);
 
     OrcFile.WriterOptions options = OrcFile.writerOptions(conf).useUTCTimestamp(true);
@@ -74,6 +91,9 @@ class OrcFileAppender<D> implements FileAppender<D> {
     }
     options.setSchema(orcSchema);
     this.writer = newOrcWriter(file, options, metadata);
+
+    // TODO: Turn to access the estimateMemorySize directly after ORC 1.7.4 released with https://github.com/apache/orc/pull/1057.
+    this.treeWriter =  treeWriterHiddenInORC();
     this.valueWriter = newOrcRowWriter(schema, orcSchema, createWriterFunc);
   }
 
@@ -86,7 +106,7 @@ class OrcFileAppender<D> implements FileAppender<D> {
         batch.reset();
       }
     } catch (IOException ioe) {
-      throw new RuntimeIOException(ioe, "Problem writing to ORC file %s", file.location());
+      throw new UncheckedIOException(String.format("Problem writing to ORC file %s", file.location()), ioe);
     }
   }
 
@@ -99,9 +119,29 @@ class OrcFileAppender<D> implements FileAppender<D> {
 
   @Override
   public long length() {
-    Preconditions.checkState(isClosed,
-        "Cannot return length while appending to an open file.");
-    return file.toInputFile().getLength();
+    if (isClosed) {
+      return file.toInputFile().getLength();
+    }
+
+    Preconditions.checkNotNull(treeWriter,
+        "Cannot estimate length of file being written as the ORC writer's internal writer is not present");
+
+    long estimateMemory = treeWriter.estimateMemory();
+
+    long dataLength = 0;
+    try {
+      List<StripeInformation> stripes = writer.getStripes();
+      if (!stripes.isEmpty()) {
+        StripeInformation stripeInformation = stripes.get(stripes.size() - 1);
+        dataLength = stripeInformation != null ? stripeInformation.getOffset() + stripeInformation.getLength() : 0;
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(String.format("Can't get Stripe's length from the file writer with path: %s.",
+          file.location()), e);
+    }
+
+    // This value is estimated, not actual.
+    return (long) Math.ceil(dataLength + (estimateMemory + (long) batch.size * avgRowByteSize) * 0.2);
   }
 
   @Override
@@ -153,4 +193,10 @@ class OrcFileAppender<D> implements FileAppender<D> {
                                                          createWriterFunc) {
     return (OrcRowWriter<D>) createWriterFunc.apply(schema, orcSchema);
   }
+
+  private TreeWriter treeWriterHiddenInORC() {
+    DynFields.BoundField<TreeWriter> treeWriterFiled =
+        DynFields.builder().hiddenImpl(writer.getClass(), "treeWriter").build(writer);
+    return treeWriterFiled.get();
+  }
 }
diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestEstimateOrcAvgWidthVisitor.java b/orc/src/test/java/org/apache/iceberg/orc/TestEstimateOrcAvgWidthVisitor.java
new file mode 100644
index 0000000..aca95ef
--- /dev/null
+++ b/orc/src/test/java/org/apache/iceberg/orc/TestEstimateOrcAvgWidthVisitor.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.orc;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestEstimateOrcAvgWidthVisitor {
+
+  // all supported fields
+  protected static final Types.NestedField ID_FIELD = required(1, "id", Types.IntegerType.get());
+  protected static final Types.NestedField DATA_FIELD = optional(2, "data", Types.StringType.get());
+  protected static final Types.NestedField FLOAT_FIELD = required(3, "float", Types.FloatType.get());
+  protected static final Types.NestedField DOUBLE_FIELD = optional(4, "double", Types.DoubleType.get());
+  protected static final Types.NestedField DECIMAL_FIELD = optional(5, "decimal", Types.DecimalType.of(5, 3));
+  protected static final Types.NestedField FIXED_FIELD = optional(7, "fixed", Types.FixedType.ofLength(4));
+  protected static final Types.NestedField BINARY_FIELD = optional(8, "binary", Types.BinaryType.get());
+  protected static final Types.NestedField FLOAT_LIST_FIELD = optional(9, "floatList",
+      Types.ListType.ofRequired(10, Types.FloatType.get()));
+  protected static final Types.NestedField LONG_FIELD = optional(11, "long", Types.LongType.get());
+  protected static final Types.NestedField BOOLEAN_FIELD = optional(12, "boolean", Types.BooleanType.get());
+  protected static final Types.NestedField TIMESTAMP_ZONE_FIELD = optional(13, "timestampZone",
+      Types.TimestampType.withZone());
+  protected static final Types.NestedField TIMESTAMP_FIELD = optional(14, "timestamp",
+      Types.TimestampType.withoutZone());
+  protected static final Types.NestedField DATE_FIELD = optional(15, "date", Types.DateType.get());
+  protected static final Types.NestedField UUID_FIELD = required(16, "uuid", Types.UUIDType.get());
+
+  protected static final Types.NestedField MAP_FIELD_1 = optional(17, "map1",
+      Types.MapType.ofOptional(18, 19, Types.FloatType.get(), Types.StringType.get())
+  );
+  protected static final Types.NestedField MAP_FIELD_2 = optional(20, "map2",
+      Types.MapType.ofOptional(21, 22, Types.IntegerType.get(), Types.DoubleType.get())
+  );
+  protected static final Types.NestedField STRUCT_FIELD = optional(23, "struct", Types.StructType.of(
+      required(24, "booleanField", Types.BooleanType.get()),
+      optional(25, "date", Types.DateType.get()),
+      optional(27, "timestamp", Types.TimestampType.withZone())
+  ));
+
+  @Test
+  public void testEstimateIntegerWidth() {
+    Schema integerSchema = new Schema(ID_FIELD);
+    TypeDescription integerOrcSchema = ORCSchemaUtil.convert(integerSchema);
+    long estimateLength = getEstimateLength(integerOrcSchema);
+    Assert.assertEquals("Estimated average length of integer must be 8.", 8, estimateLength);
+  }
+
+  @Test
+  public void testEstimateStringWidth() {
+    Schema stringSchema = new Schema(DATA_FIELD);
+    TypeDescription stringOrcSchema = ORCSchemaUtil.convert(stringSchema);
+    long estimateLength = getEstimateLength(stringOrcSchema);
+    Assert.assertEquals("Estimated average length of string must be 128.", 128, estimateLength);
+  }
+
+  @Test
+  public void testEstimateFloatWidth() {
+    Schema floatSchema = new Schema(FLOAT_FIELD);
+    TypeDescription floatOrcSchema = ORCSchemaUtil.convert(floatSchema);
+    long estimateLength = getEstimateLength(floatOrcSchema);
+    Assert.assertEquals("Estimated average length of float must be 8.", 8, estimateLength);
+  }
+
+  @Test
+  public void testEstimateDoubleWidth() {
+    Schema doubleSchema = new Schema(DOUBLE_FIELD);
+    TypeDescription doubleOrcSchema = ORCSchemaUtil.convert(doubleSchema);
+    long estimateLength = getEstimateLength(doubleOrcSchema);
+    Assert.assertEquals("Estimated average length of double must be 8.", 8, estimateLength);
+  }
+
+  @Test
+  public void testEstimateDecimalWidth() {
+    Schema decimalSchema = new Schema(DECIMAL_FIELD);
+    TypeDescription decimalOrcSchema = ORCSchemaUtil.convert(decimalSchema);
+    long estimateLength = getEstimateLength(decimalOrcSchema);
+    Assert.assertEquals("Estimated average length of decimal must be 7.", 7, estimateLength);
+  }
+
+  @Test
+  public void testEstimateFixedWidth() {
+    Schema fixedSchema = new Schema(FIXED_FIELD);
+    TypeDescription fixedOrcSchema = ORCSchemaUtil.convert(fixedSchema);
+    long estimateLength = getEstimateLength(fixedOrcSchema);
+    Assert.assertEquals("Estimated average length of fixed must be 128.", 128, estimateLength);
+  }
+
+  @Test
+  public void testEstimateBinaryWidth() {
+    Schema binarySchema = new Schema(BINARY_FIELD);
+    TypeDescription binaryOrcSchema = ORCSchemaUtil.convert(binarySchema);
+    long estimateLength = getEstimateLength(binaryOrcSchema);
+    Assert.assertEquals("Estimated average length of binary must be 128.", 128, estimateLength);
+  }
+
+  @Test
+  public void testEstimateListWidth() {
+    Schema listSchema = new Schema(FLOAT_LIST_FIELD);
+    TypeDescription listOrcSchema = ORCSchemaUtil.convert(listSchema);
+    long estimateLength = getEstimateLength(listOrcSchema);
+    Assert.assertEquals("Estimated average length of list must be 8.", 8, estimateLength);
+  }
+
+  @Test
+  public void testEstimateLongWidth() {
+    Schema longSchema = new Schema(LONG_FIELD);
+    TypeDescription longOrcSchema = ORCSchemaUtil.convert(longSchema);
+    long estimateLength = getEstimateLength(longOrcSchema);
+    Assert.assertEquals("Estimated average length of long must be 8.", 8, estimateLength);
+  }
+
+  @Test
+  public void testEstimateBooleanWidth() {
+    Schema booleanSchema = new Schema(BOOLEAN_FIELD);
+    TypeDescription booleanOrcSchema = ORCSchemaUtil.convert(booleanSchema);
+    long estimateLength = getEstimateLength(booleanOrcSchema);
+    Assert.assertEquals("Estimated average length of boolean must be 8.", 8, estimateLength);
+  }
+
+  @Test
+  public void testEstimateTimestampWidth() {
+    Schema timestampZoneSchema = new Schema(TIMESTAMP_ZONE_FIELD);
+    TypeDescription timestampZoneOrcSchema = ORCSchemaUtil.convert(timestampZoneSchema);
+    long estimateLength = getEstimateLength(timestampZoneOrcSchema);
+    Assert.assertEquals("Estimated average length of timestamps with zone must be 12.", 12, estimateLength);
+
+    Schema timestampSchema = new Schema(TIMESTAMP_FIELD);
+    TypeDescription timestampOrcSchema = ORCSchemaUtil.convert(timestampSchema);
+    estimateLength = getEstimateLength(timestampOrcSchema);
+    Assert.assertEquals("Estimated average length of timestamp must be 12.", 12, estimateLength);
+  }
+
+  @Test
+  public void testEstimateDateWidth() {
+    Schema dateSchema = new Schema(DATE_FIELD);
+    TypeDescription dateOrcSchema = ORCSchemaUtil.convert(dateSchema);
+    long estimateLength = getEstimateLength(dateOrcSchema);
+    Assert.assertEquals("Estimated average length of date must be 8.", 8, estimateLength);
+  }
+
+  @Test
+  public void testEstimateUUIDWidth() {
+    Schema uuidSchema = new Schema(UUID_FIELD);
+    TypeDescription uuidOrcSchema = ORCSchemaUtil.convert(uuidSchema);
+    long estimateLength = getEstimateLength(uuidOrcSchema);
+    Assert.assertEquals("Estimated average length of uuid must be 128.", 128, estimateLength);
+  }
+
+  @Test
+  public void testEstimateMapWidth() {
+    Schema mapSchema = new Schema(MAP_FIELD_1);
+    TypeDescription mapOrcSchema = ORCSchemaUtil.convert(mapSchema);
+    long estimateLength = getEstimateLength(mapOrcSchema);
+    Assert.assertEquals("Estimated average length of map must be 136.", 136, estimateLength);
+  }
+
+  @Test
+  public void testEstimateStructWidth() {
+    Schema structSchema = new Schema(STRUCT_FIELD);
+    TypeDescription structOrcSchema = ORCSchemaUtil.convert(structSchema);
+    long estimateLength = getEstimateLength(structOrcSchema);
+    Assert.assertEquals("Estimated average length of struct must be 28.", 28, estimateLength);
+  }
+
+  @Test
+  public void testEstimateFullWidth() {
+    Schema fullSchema = new Schema(ID_FIELD, DATA_FIELD, FLOAT_FIELD, DOUBLE_FIELD, DECIMAL_FIELD, FIXED_FIELD,
+        BINARY_FIELD, FLOAT_LIST_FIELD, LONG_FIELD, MAP_FIELD_1, MAP_FIELD_2, STRUCT_FIELD);
+    TypeDescription fullOrcSchema = ORCSchemaUtil.convert(fullSchema);
+    long estimateLength = getEstimateLength(fullOrcSchema);
+    Assert.assertEquals("Estimated average length of the row must be 611.", 611, estimateLength);
+  }
+
+  private Integer getEstimateLength(TypeDescription orcSchemaWithDate) {
+    return OrcSchemaVisitor.visitSchema(orcSchemaWithDate, new EstimateOrcAvgWidthVisitor())
+        .stream().reduce(Integer::sum).orElse(0);
+  }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index a669622..94f4614 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -42,7 +42,6 @@ import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.ClusteredDataWriter;
@@ -636,13 +635,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
 
     private UnpartitionedDataWriter(SparkFileWriterFactory writerFactory, OutputFileFactory fileFactory,
                                     FileIO io, PartitionSpec spec, FileFormat format, long targetFileSize) {
-      // TODO: support ORC rolling writers
-      if (format == FileFormat.ORC) {
-        EncryptedOutputFile outputFile = fileFactory.newOutputFile();
-        delegate = writerFactory.newDataWriter(outputFile, spec, null);
-      } else {
-        delegate = new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSize, spec, null);
-      }
+      delegate = new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSize, spec, null);
       this.io = io;
     }
 
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 5e99dac..f924175 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -365,11 +365,9 @@ public class TestSparkDataWrite {
         files.add(file);
       }
     }
-    // TODO: ORC file now not support target file size
-    if (!format.equals(FileFormat.ORC)) {
-      Assert.assertEquals("Should have 4 DataFiles", 4, files.size());
-      Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
-    }
+
+    Assert.assertEquals("Should have 4 DataFiles", 4, files.size());
+    Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
   }
 
   @Test
@@ -585,11 +583,9 @@ public class TestSparkDataWrite {
         files.add(file);
       }
     }
-    // TODO: ORC file now not support target file size
-    if (!format.equals(FileFormat.ORC)) {
-      Assert.assertEquals("Should have 8 DataFiles", 8, files.size());
-      Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
-    }
+
+    Assert.assertEquals("Should have 8 DataFiles", 8, files.size());
+    Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
   }
 
   public enum IcebergOptionsType {