You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/13 19:29:46 UTC

[GitHub] [iceberg] shardulm94 commented on a change in pull request #1197: Refactor the GenericOrcWriter by using OrcSchemaWithTypeVisitor#visit

shardulm94 commented on a change in pull request #1197:
URL: https://github.com/apache/iceberg/pull/1197#discussion_r453861043



##########
File path: data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
##########
@@ -19,590 +19,119 @@
 
 package org.apache.iceberg.data.orc;
 
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
-import java.time.temporal.ChronoUnit;
 import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.orc.ORCSchemaUtil;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
 import org.apache.iceberg.orc.OrcValueWriter;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.storage.common.type.HiveDecimal;
-import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
-import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
-import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
-import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
-import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 
 public class GenericOrcWriter implements OrcValueWriter<Record> {
-  private final Converter[] converters;
-  private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
-  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
-
-  private GenericOrcWriter(TypeDescription schema) {
-    this.converters = buildConverters(schema);
-  }
-
-  public static OrcValueWriter<Record> buildWriter(TypeDescription fileSchema) {
-    return new GenericOrcWriter(fileSchema);
+  private final GenericOrcWriters.Converter converter;
+
+  private GenericOrcWriter(Schema expectedSchema, TypeDescription orcSchema) {
+    Preconditions.checkArgument(orcSchema.getCategory() == TypeDescription.Category.STRUCT,
+        "Top level must be a struct " + orcSchema);
+
+    converter = OrcSchemaWithTypeVisitor.visit(expectedSchema, orcSchema, new WriteBuilder());
+  }
+
+  public static OrcValueWriter<Record> buildWriter(Schema expectedSchema, TypeDescription fileSchema) {
+    return new GenericOrcWriter(expectedSchema, fileSchema);
+  }
+
+  private static class WriteBuilder extends OrcSchemaWithTypeVisitor<GenericOrcWriters.Converter> {
+    private WriteBuilder() {
+    }
+
+    public GenericOrcWriters.Converter record(Types.StructType iStruct, TypeDescription record,
+                                              List<String> names, List<GenericOrcWriters.Converter> fields) {
+      return new GenericOrcWriters.RecordConverter(fields);
+    }
+
+    public GenericOrcWriters.Converter list(Types.ListType iList, TypeDescription array,
+                                            GenericOrcWriters.Converter element) {
+      return new GenericOrcWriters.ListConverter(element);
+    }
+
+    public GenericOrcWriters.Converter map(Types.MapType iMap, TypeDescription map,
+                                           GenericOrcWriters.Converter key, GenericOrcWriters.Converter value) {
+      return new GenericOrcWriters.MapConverter(key, value);
+    }
+
+    public GenericOrcWriters.Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription schema) {
+      switch (schema.getCategory()) {
+        case BOOLEAN:
+          return GenericOrcWriters.booleans();
+        case BYTE:
+          return GenericOrcWriters.bytes();
+        case SHORT:
+          return GenericOrcWriters.shorts();

Review comment:
       Iceberg would not produce a TypeDescription with BYTE or SHORT types, so we should just throw an unsupported exception for these cases.

##########
File path: data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
##########
@@ -19,590 +19,119 @@
 
 package org.apache.iceberg.data.orc;
 
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
-import java.time.temporal.ChronoUnit;
 import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.orc.ORCSchemaUtil;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
 import org.apache.iceberg.orc.OrcValueWriter;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.storage.common.type.HiveDecimal;
-import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
-import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
-import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
-import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
-import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 
 public class GenericOrcWriter implements OrcValueWriter<Record> {
-  private final Converter[] converters;
-  private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
-  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
-
-  private GenericOrcWriter(TypeDescription schema) {
-    this.converters = buildConverters(schema);
-  }
-
-  public static OrcValueWriter<Record> buildWriter(TypeDescription fileSchema) {
-    return new GenericOrcWriter(fileSchema);
+  private final GenericOrcWriters.Converter converter;
+
+  private GenericOrcWriter(Schema expectedSchema, TypeDescription orcSchema) {
+    Preconditions.checkArgument(orcSchema.getCategory() == TypeDescription.Category.STRUCT,
+        "Top level must be a struct " + orcSchema);
+
+    converter = OrcSchemaWithTypeVisitor.visit(expectedSchema, orcSchema, new WriteBuilder());
+  }
+
+  public static OrcValueWriter<Record> buildWriter(Schema expectedSchema, TypeDescription fileSchema) {
+    return new GenericOrcWriter(expectedSchema, fileSchema);
+  }
+
+  private static class WriteBuilder extends OrcSchemaWithTypeVisitor<GenericOrcWriters.Converter> {
+    private WriteBuilder() {
+    }
+
+    public GenericOrcWriters.Converter record(Types.StructType iStruct, TypeDescription record,
+                                              List<String> names, List<GenericOrcWriters.Converter> fields) {
+      return new GenericOrcWriters.RecordConverter(fields);
+    }
+
+    public GenericOrcWriters.Converter list(Types.ListType iList, TypeDescription array,
+                                            GenericOrcWriters.Converter element) {
+      return new GenericOrcWriters.ListConverter(element);
+    }
+
+    public GenericOrcWriters.Converter map(Types.MapType iMap, TypeDescription map,
+                                           GenericOrcWriters.Converter key, GenericOrcWriters.Converter value) {
+      return new GenericOrcWriters.MapConverter(key, value);
+    }
+
+    public GenericOrcWriters.Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription schema) {
+      switch (schema.getCategory()) {
+        case BOOLEAN:
+          return GenericOrcWriters.booleans();
+        case BYTE:
+          return GenericOrcWriters.bytes();
+        case SHORT:
+          return GenericOrcWriters.shorts();
+        case DATE:
+          return GenericOrcWriters.dates();
+        case INT:
+          return GenericOrcWriters.ints();
+        case LONG:
+          String longAttributeValue = schema.getAttributeValue(ORCSchemaUtil.ICEBERG_LONG_TYPE_ATTRIBUTE);
+          ORCSchemaUtil.LongType longType = longAttributeValue == null ? ORCSchemaUtil.LongType.LONG :
+              ORCSchemaUtil.LongType.valueOf(longAttributeValue);
+          switch (longType) {
+            case TIME:
+              return GenericOrcWriters.times();
+            case LONG:
+              return GenericOrcWriters.longs();
+            default:
+              throw new IllegalStateException("Unhandled Long type found in ORC type attribute: " + longType);
+          }
+        case FLOAT:
+          return GenericOrcWriters.floats();
+        case DOUBLE:
+          return GenericOrcWriters.doubles();
+        case BINARY:
+          String binaryAttributeValue = schema.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE);
+          ORCSchemaUtil.BinaryType binaryType = binaryAttributeValue == null ? ORCSchemaUtil.BinaryType.BINARY :
+              ORCSchemaUtil.BinaryType.valueOf(binaryAttributeValue);
+          switch (binaryType) {
+            case UUID:
+              return GenericOrcWriters.uuids();
+            case FIXED:
+              return GenericOrcWriters.fixed();
+            case BINARY:
+              return GenericOrcWriters.binary();
+            default:
+              throw new IllegalStateException("Unhandled Binary type found in ORC type attribute: " + binaryType);
+          }

Review comment:
       Now that we had the Iceberg primitive type available, we can use the Iceberg type to decide whether this is UUID, FIXED or BINARY. That is similar to what we do in `GenericOrcReader`.

##########
File path: data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+
+public class GenericOrcWriters {
+  private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+  private GenericOrcWriters() {
+  }
+
+  /**
+   * The interface for the conversion from Spark's SpecializedGetters to
+   * ORC's ColumnVectors.
+   */
+  interface Converter<T> {
+
+    Class<T> getJavaClass();
+
+    /**
+     * Take a value from the Spark data value and add it to the ORC output.
+     *
+     * @param rowId  the row in the ColumnVector
+     * @param data   either an InternalRow or ArrayData
+     * @param output the ColumnVector to put the value into
+     */
+    void addValue(int rowId, T data, ColumnVector output);
+  }
+
+  public static Converter<Boolean> booleans() {
+    return BooleanConverter.INSTANCE;
+  }
+
+  public static Converter<Byte> bytes() {
+    return ByteConverter.INSTANCE;
+  }
+
+  public static Converter<Short> shorts() {
+    return ShortConverter.INSTANCE;
+  }
+
+  public static Converter<Integer> ints() {
+    return IntConverter.INSTANCE;
+  }
+
+  public static Converter<LocalTime> times() {
+    return TimeConverter.INSTANCE;
+  }
+
+  public static Converter<Long> longs() {
+    return LongConverter.INSTANCE;
+  }
+
+  public static Converter<Float> floats() {
+    return FloatConverter.INSTANCE;
+  }
+
+  public static Converter<Double> doubles() {
+    return DoubleConverter.INSTANCE;
+  }
+
+  public static Converter<String> strings() {
+    return StringConverter.INSTANCE;
+  }
+
+  public static Converter<ByteBuffer> binary() {
+    return BytesConverter.INSTANCE;
+  }
+
+  public static Converter<UUID> uuids() {
+    return UUIDConverter.INSTANCE;
+  }
+
+  public static Converter<byte[]> fixed() {
+    return FixedConverter.INSTANCE;
+  }
+
+  public static Converter<LocalDate> dates() {
+    return DateConverter.INSTANCE;
+  }
+
+  public static Converter<OffsetDateTime> timestampTz() {
+    return TimestampTzConverter.INSTANCE;
+  }
+
+  public static Converter<LocalDateTime> timestamp() {
+    return TimestampConverter.INSTANCE;
+  }
+
+  public static Converter<BigDecimal> decimal18(TypeDescription schema) {
+    return new Decimal18Converter(schema);

Review comment:
       Nit: Can we just pass in the (precision, scale) to the writer instead of the whole schema?

##########
File path: data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+
+public class GenericOrcWriters {
+  private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+  private GenericOrcWriters() {
+  }
+
+  /**
+   * The interface for the conversion from Spark's SpecializedGetters to
+   * ORC's ColumnVectors.

Review comment:
       Remove references to Spark and SpecializedGetters.

##########
File path: data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+
+public class GenericOrcWriters {
+  private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+  private GenericOrcWriters() {
+  }
+
+  /**
+   * The interface for the conversion from Spark's SpecializedGetters to
+   * ORC's ColumnVectors.
+   */
+  interface Converter<T> {
+
+    Class<T> getJavaClass();
+
+    /**
+     * Take a value from the Spark data value and add it to the ORC output.

Review comment:
       Remove references to Spark.

##########
File path: data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+
+public class GenericOrcWriters {
+  private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+  private GenericOrcWriters() {
+  }
+
+  /**
+   * The interface for the conversion from Spark's SpecializedGetters to
+   * ORC's ColumnVectors.
+   */
+  interface Converter<T> {
+
+    Class<T> getJavaClass();
+
+    /**
+     * Take a value from the Spark data value and add it to the ORC output.
+     *
+     * @param rowId  the row in the ColumnVector
+     * @param data   either an InternalRow or ArrayData

Review comment:
       Remove references to Spark's InternalRow and ArrayData classes

##########
File path: data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
##########
@@ -19,590 +19,119 @@
 
 package org.apache.iceberg.data.orc;
 
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
-import java.time.temporal.ChronoUnit;
 import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.orc.ORCSchemaUtil;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
 import org.apache.iceberg.orc.OrcValueWriter;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.storage.common.type.HiveDecimal;
-import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
-import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
-import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
-import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
-import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 
 public class GenericOrcWriter implements OrcValueWriter<Record> {
-  private final Converter[] converters;
-  private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
-  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
-
-  private GenericOrcWriter(TypeDescription schema) {
-    this.converters = buildConverters(schema);
-  }
-
-  public static OrcValueWriter<Record> buildWriter(TypeDescription fileSchema) {
-    return new GenericOrcWriter(fileSchema);
+  private final GenericOrcWriters.Converter converter;
+
+  private GenericOrcWriter(Schema expectedSchema, TypeDescription orcSchema) {
+    Preconditions.checkArgument(orcSchema.getCategory() == TypeDescription.Category.STRUCT,
+        "Top level must be a struct " + orcSchema);
+
+    converter = OrcSchemaWithTypeVisitor.visit(expectedSchema, orcSchema, new WriteBuilder());
+  }
+
+  public static OrcValueWriter<Record> buildWriter(Schema expectedSchema, TypeDescription fileSchema) {
+    return new GenericOrcWriter(expectedSchema, fileSchema);
+  }
+
+  private static class WriteBuilder extends OrcSchemaWithTypeVisitor<GenericOrcWriters.Converter> {
+    private WriteBuilder() {
+    }
+
+    public GenericOrcWriters.Converter record(Types.StructType iStruct, TypeDescription record,
+                                              List<String> names, List<GenericOrcWriters.Converter> fields) {
+      return new GenericOrcWriters.RecordConverter(fields);
+    }
+
+    public GenericOrcWriters.Converter list(Types.ListType iList, TypeDescription array,
+                                            GenericOrcWriters.Converter element) {
+      return new GenericOrcWriters.ListConverter(element);
+    }
+
+    public GenericOrcWriters.Converter map(Types.MapType iMap, TypeDescription map,
+                                           GenericOrcWriters.Converter key, GenericOrcWriters.Converter value) {
+      return new GenericOrcWriters.MapConverter(key, value);
+    }
+
+    public GenericOrcWriters.Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription schema) {
+      switch (schema.getCategory()) {
+        case BOOLEAN:
+          return GenericOrcWriters.booleans();
+        case BYTE:
+          return GenericOrcWriters.bytes();
+        case SHORT:
+          return GenericOrcWriters.shorts();
+        case DATE:
+          return GenericOrcWriters.dates();
+        case INT:
+          return GenericOrcWriters.ints();
+        case LONG:
+          String longAttributeValue = schema.getAttributeValue(ORCSchemaUtil.ICEBERG_LONG_TYPE_ATTRIBUTE);
+          ORCSchemaUtil.LongType longType = longAttributeValue == null ? ORCSchemaUtil.LongType.LONG :
+              ORCSchemaUtil.LongType.valueOf(longAttributeValue);

Review comment:
       Now that we had the Iceberg primitive type available, we can use the Iceberg type to decide whether this is long or time. That is similar to what we do in `GenericOrcReader`.

##########
File path: data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java
##########
@@ -19,590 +19,119 @@
 
 package org.apache.iceberg.data.orc;
 
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.LocalTime;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
-import java.time.temporal.ChronoUnit;
 import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.orc.ORCSchemaUtil;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
 import org.apache.iceberg.orc.OrcValueWriter;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.storage.common.type.HiveDecimal;
-import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
-import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
-import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
-import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
-import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 
 public class GenericOrcWriter implements OrcValueWriter<Record> {
-  private final Converter[] converters;
-  private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
-  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
-
-  private GenericOrcWriter(TypeDescription schema) {
-    this.converters = buildConverters(schema);
-  }
-
-  public static OrcValueWriter<Record> buildWriter(TypeDescription fileSchema) {
-    return new GenericOrcWriter(fileSchema);
+  private final GenericOrcWriters.Converter converter;
+
+  private GenericOrcWriter(Schema expectedSchema, TypeDescription orcSchema) {
+    Preconditions.checkArgument(orcSchema.getCategory() == TypeDescription.Category.STRUCT,
+        "Top level must be a struct " + orcSchema);
+
+    converter = OrcSchemaWithTypeVisitor.visit(expectedSchema, orcSchema, new WriteBuilder());
+  }
+
+  public static OrcValueWriter<Record> buildWriter(Schema expectedSchema, TypeDescription fileSchema) {
+    return new GenericOrcWriter(expectedSchema, fileSchema);
+  }
+
+  private static class WriteBuilder extends OrcSchemaWithTypeVisitor<GenericOrcWriters.Converter> {
+    private WriteBuilder() {
+    }
+
+    public GenericOrcWriters.Converter record(Types.StructType iStruct, TypeDescription record,
+                                              List<String> names, List<GenericOrcWriters.Converter> fields) {
+      return new GenericOrcWriters.RecordConverter(fields);
+    }
+
+    public GenericOrcWriters.Converter list(Types.ListType iList, TypeDescription array,
+                                            GenericOrcWriters.Converter element) {
+      return new GenericOrcWriters.ListConverter(element);
+    }
+
+    public GenericOrcWriters.Converter map(Types.MapType iMap, TypeDescription map,
+                                           GenericOrcWriters.Converter key, GenericOrcWriters.Converter value) {
+      return new GenericOrcWriters.MapConverter(key, value);
+    }
+
+    public GenericOrcWriters.Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription schema) {
+      switch (schema.getCategory()) {
+        case BOOLEAN:
+          return GenericOrcWriters.booleans();
+        case BYTE:
+          return GenericOrcWriters.bytes();
+        case SHORT:
+          return GenericOrcWriters.shorts();
+        case DATE:
+          return GenericOrcWriters.dates();
+        case INT:
+          return GenericOrcWriters.ints();
+        case LONG:
+          String longAttributeValue = schema.getAttributeValue(ORCSchemaUtil.ICEBERG_LONG_TYPE_ATTRIBUTE);
+          ORCSchemaUtil.LongType longType = longAttributeValue == null ? ORCSchemaUtil.LongType.LONG :
+              ORCSchemaUtil.LongType.valueOf(longAttributeValue);
+          switch (longType) {
+            case TIME:
+              return GenericOrcWriters.times();
+            case LONG:
+              return GenericOrcWriters.longs();
+            default:
+              throw new IllegalStateException("Unhandled Long type found in ORC type attribute: " + longType);
+          }
+        case FLOAT:
+          return GenericOrcWriters.floats();
+        case DOUBLE:
+          return GenericOrcWriters.doubles();
+        case BINARY:
+          String binaryAttributeValue = schema.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE);
+          ORCSchemaUtil.BinaryType binaryType = binaryAttributeValue == null ? ORCSchemaUtil.BinaryType.BINARY :
+              ORCSchemaUtil.BinaryType.valueOf(binaryAttributeValue);
+          switch (binaryType) {
+            case UUID:
+              return GenericOrcWriters.uuids();
+            case FIXED:
+              return GenericOrcWriters.fixed();
+            case BINARY:
+              return GenericOrcWriters.binary();
+            default:
+              throw new IllegalStateException("Unhandled Binary type found in ORC type attribute: " + binaryType);
+          }
+        case STRING:
+        case CHAR:
+        case VARCHAR:
+          return GenericOrcWriters.strings();
+        case DECIMAL:
+          return schema.getPrecision() <= 18 ? GenericOrcWriters.decimal18(schema) :
+              GenericOrcWriters.decimal38(schema);

Review comment:
       Since we are rewriting this, should we consider putting the `if (precision < 18) then THIS else THAT` logic in `GenericOrcWriters.decimals()` and not exposing Decimal18 and Decimal38 separately? This similar to what we did recently for readers in this PR https://github.com/apache/iceberg/blob/7d0cf1cafec9ff42d8e4cb5019185af00ff80d58/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java#L57

##########
File path: data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+
+public class GenericOrcWriters {
+  private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+  private GenericOrcWriters() {
+  }
+
+  /**
+   * The interface for the conversion from Spark's SpecializedGetters to
+   * ORC's ColumnVectors.
+   */
+  interface Converter<T> {
+
+    Class<T> getJavaClass();
+
+    /**
+     * Take a value from the Spark data value and add it to the ORC output.
+     *
+     * @param rowId  the row in the ColumnVector
+     * @param data   either an InternalRow or ArrayData
+     * @param output the ColumnVector to put the value into
+     */
+    void addValue(int rowId, T data, ColumnVector output);
+  }
+
+  public static Converter<Boolean> booleans() {
+    return BooleanConverter.INSTANCE;
+  }
+
+  public static Converter<Byte> bytes() {
+    return ByteConverter.INSTANCE;
+  }
+
+  public static Converter<Short> shorts() {
+    return ShortConverter.INSTANCE;
+  }
+
+  public static Converter<Integer> ints() {
+    return IntConverter.INSTANCE;
+  }
+
+  public static Converter<LocalTime> times() {
+    return TimeConverter.INSTANCE;
+  }
+
+  public static Converter<Long> longs() {
+    return LongConverter.INSTANCE;
+  }
+
+  public static Converter<Float> floats() {
+    return FloatConverter.INSTANCE;
+  }
+
+  public static Converter<Double> doubles() {
+    return DoubleConverter.INSTANCE;
+  }
+
+  public static Converter<String> strings() {
+    return StringConverter.INSTANCE;
+  }
+
+  public static Converter<ByteBuffer> binary() {
+    return BytesConverter.INSTANCE;
+  }
+
+  public static Converter<UUID> uuids() {
+    return UUIDConverter.INSTANCE;
+  }
+
+  public static Converter<byte[]> fixed() {
+    return FixedConverter.INSTANCE;
+  }
+
+  public static Converter<LocalDate> dates() {
+    return DateConverter.INSTANCE;
+  }
+
+  public static Converter<OffsetDateTime> timestampTz() {
+    return TimestampTzConverter.INSTANCE;
+  }
+
+  public static Converter<LocalDateTime> timestamp() {
+    return TimestampConverter.INSTANCE;
+  }
+
+  public static Converter<BigDecimal> decimal18(TypeDescription schema) {
+    return new Decimal18Converter(schema);
+  }
+
+  public static Converter<BigDecimal> decimal38(TypeDescription schema) {
+    return Decimal38Converter.INSTANCE;
+  }
+
+  private static class BooleanConverter implements Converter<Boolean> {
+    private static final Converter<Boolean> INSTANCE = new BooleanConverter();
+
+    @Override
+    public Class<Boolean> getJavaClass() {
+      return Boolean.class;
+    }
+
+    @Override
+    public void addValue(int rowId, Boolean data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;

Review comment:
       This code is repeated for every converter. Consider doing something similar to the read side, where we put this logic in this Converter interface itself so the the implementations only need to think about non-null cases.
   https://github.com/apache/iceberg/blob/809697a6bf56698b1bc52c5f039f2a190a58fed5/orc/src/main/java/org/apache/iceberg/orc/OrcValueReader.java#L25-L36

##########
File path: data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java
##########
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.orc;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+
+public class GenericOrcWriters {
+  private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+  private GenericOrcWriters() {
+  }
+
+  /**
+   * The interface for the conversion from Spark's SpecializedGetters to
+   * ORC's ColumnVectors.
+   */
+  interface Converter<T> {
+
+    Class<T> getJavaClass();
+
+    /**
+     * Take a value from the Spark data value and add it to the ORC output.
+     *
+     * @param rowId  the row in the ColumnVector
+     * @param data   either an InternalRow or ArrayData
+     * @param output the ColumnVector to put the value into
+     */
+    void addValue(int rowId, T data, ColumnVector output);
+  }
+
+  public static Converter<Boolean> booleans() {
+    return BooleanConverter.INSTANCE;
+  }
+
+  public static Converter<Byte> bytes() {
+    return ByteConverter.INSTANCE;
+  }
+
+  public static Converter<Short> shorts() {
+    return ShortConverter.INSTANCE;
+  }
+
+  public static Converter<Integer> ints() {
+    return IntConverter.INSTANCE;
+  }
+
+  public static Converter<LocalTime> times() {
+    return TimeConverter.INSTANCE;
+  }
+
+  public static Converter<Long> longs() {
+    return LongConverter.INSTANCE;
+  }
+
+  public static Converter<Float> floats() {
+    return FloatConverter.INSTANCE;
+  }
+
+  public static Converter<Double> doubles() {
+    return DoubleConverter.INSTANCE;
+  }
+
+  public static Converter<String> strings() {
+    return StringConverter.INSTANCE;
+  }
+
+  public static Converter<ByteBuffer> binary() {
+    return BytesConverter.INSTANCE;
+  }
+
+  public static Converter<UUID> uuids() {
+    return UUIDConverter.INSTANCE;
+  }
+
+  public static Converter<byte[]> fixed() {
+    return FixedConverter.INSTANCE;
+  }
+
+  public static Converter<LocalDate> dates() {
+    return DateConverter.INSTANCE;
+  }
+
+  public static Converter<OffsetDateTime> timestampTz() {
+    return TimestampTzConverter.INSTANCE;
+  }
+
+  public static Converter<LocalDateTime> timestamp() {
+    return TimestampConverter.INSTANCE;
+  }
+
+  public static Converter<BigDecimal> decimal18(TypeDescription schema) {
+    return new Decimal18Converter(schema);
+  }
+
+  public static Converter<BigDecimal> decimal38(TypeDescription schema) {
+    return Decimal38Converter.INSTANCE;
+  }
+
+  private static class BooleanConverter implements Converter<Boolean> {
+    private static final Converter<Boolean> INSTANCE = new BooleanConverter();
+
+    @Override
+    public Class<Boolean> getJavaClass() {
+      return Boolean.class;
+    }
+
+    @Override
+    public void addValue(int rowId, Boolean data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data ? 1 : 0;
+      }
+    }
+  }
+
+  private static class ByteConverter implements Converter<Byte> {
+    private static final Converter<Byte> INSTANCE = new ByteConverter();
+
+    @Override
+    public Class<Byte> getJavaClass() {
+      return Byte.class;
+    }
+
+    @Override
+    public void addValue(int rowId, Byte data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  private static class ShortConverter implements Converter<Short> {
+    private static final Converter<Short> INSTANCE = new ShortConverter();
+
+    @Override
+    public Class<Short> getJavaClass() {
+      return Short.class;
+    }
+
+    @Override
+    public void addValue(int rowId, Short data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  private static class IntConverter implements Converter<Integer> {
+    private static final Converter<Integer> INSTANCE = new IntConverter();
+
+    @Override
+    public Class<Integer> getJavaClass() {
+      return Integer.class;
+    }
+
+    @Override
+    public void addValue(int rowId, Integer data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  private static class TimeConverter implements Converter<LocalTime> {
+    private static final Converter<LocalTime> INSTANCE = new TimeConverter();
+
+    @Override
+    public Class<LocalTime> getJavaClass() {
+      return LocalTime.class;
+    }
+
+    @Override
+    public void addValue(int rowId, LocalTime data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data.toNanoOfDay() / 1_000;
+      }
+    }
+  }
+
+  private static class LongConverter implements Converter<Long> {
+    private static final Converter<Long> INSTANCE = new LongConverter();
+
+    @Override
+    public Class<Long> getJavaClass() {
+      return Long.class;
+    }
+
+    @Override
+    public void addValue(int rowId, Long data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  private static class FloatConverter implements Converter<Float> {
+    private static final Converter<Float> INSTANCE = new FloatConverter();
+
+    @Override
+    public Class<Float> getJavaClass() {
+      return Float.class;
+    }
+
+    @Override
+    public void addValue(int rowId, Float data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((DoubleColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  private static class DoubleConverter implements Converter<Double> {
+    private static final Converter<Double> INSTANCE = new DoubleConverter();
+
+    @Override
+    public Class<Double> getJavaClass() {
+      return Double.class;
+    }
+
+    @Override
+    public void addValue(int rowId, Double data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((DoubleColumnVector) output).vector[rowId] = data;
+      }
+    }
+  }
+
+  private static class StringConverter implements Converter<String> {
+    private static final Converter<String> INSTANCE = new StringConverter();
+
+    @Override
+    public Class<String> getJavaClass() {
+      return String.class;
+    }
+
+    @Override
+    public void addValue(int rowId, String data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        byte[] value = data.getBytes(StandardCharsets.UTF_8);
+        ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
+      }
+    }
+  }
+
+  private static class BytesConverter implements Converter<ByteBuffer> {
+    private static final Converter<ByteBuffer> INSTANCE = new BytesConverter();
+
+    @Override
+    public Class<ByteBuffer> getJavaClass() {
+      return ByteBuffer.class;
+    }
+
+    @Override
+    public void addValue(int rowId, ByteBuffer data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((BytesColumnVector) output).setRef(rowId, data.array(), 0, data.array().length);
+      }
+    }
+  }
+
+  private static class UUIDConverter implements Converter<UUID> {
+    private static final Converter<UUID> INSTANCE = new UUIDConverter();
+
+    @Override
+    public Class<UUID> getJavaClass() {
+      return UUID.class;
+    }
+
+    @Override
+    public void addValue(int rowId, UUID data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ByteBuffer buffer = ByteBuffer.allocate(16);
+        buffer.putLong(data.getMostSignificantBits());
+        buffer.putLong(data.getLeastSignificantBits());
+        ((BytesColumnVector) output).setRef(rowId, buffer.array(), 0, buffer.array().length);
+      }
+    }
+  }
+
+  private static class FixedConverter implements Converter<byte[]> {
+    private static final Converter<byte[]> INSTANCE = new FixedConverter();
+
+    @Override
+    public Class<byte[]> getJavaClass() {
+      return byte[].class;
+    }
+
+    @Override
+    public void addValue(int rowId, byte[] data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((BytesColumnVector) output).setRef(rowId, data, 0, data.length);
+      }
+    }
+  }
+
+  private static class DateConverter implements Converter<LocalDate> {
+    private static final Converter<LocalDate> INSTANCE = new DateConverter();
+
+    @Override
+    public Class<LocalDate> getJavaClass() {
+      return LocalDate.class;
+    }
+
+    @Override
+    public void addValue(int rowId, LocalDate data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((LongColumnVector) output).vector[rowId] = ChronoUnit.DAYS.between(EPOCH_DAY, data);
+      }
+    }
+  }
+
+  private static class TimestampTzConverter implements Converter<OffsetDateTime> {
+    private static final Converter<OffsetDateTime> INSTANCE = new TimestampTzConverter();
+
+    @Override
+    public Class<OffsetDateTime> getJavaClass() {
+      return OffsetDateTime.class;
+    }
+
+    @Override
+    public void addValue(int rowId, OffsetDateTime data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        TimestampColumnVector cv = (TimestampColumnVector) output;
+        cv.time[rowId] = data.toInstant().toEpochMilli(); // millis
+        cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision
+      }
+    }
+  }
+
+  private static class TimestampConverter implements Converter<LocalDateTime> {
+    private static final Converter<LocalDateTime> INSTANCE = new TimestampConverter();
+
+    @Override
+    public Class<LocalDateTime> getJavaClass() {
+      return LocalDateTime.class;
+    }
+
+    @Override
+    public void addValue(int rowId, LocalDateTime data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        TimestampColumnVector cv = (TimestampColumnVector) output;
+        cv.setIsUTC(true);
+        cv.time[rowId] = data.toInstant(ZoneOffset.UTC).toEpochMilli(); // millis
+        cv.nanos[rowId] = (data.getNano() / 1_000) * 1_000; // truncate nanos to only keep microsecond precision
+      }
+    }
+  }
+
+  private static class Decimal18Converter implements Converter<BigDecimal> {
+    private final int scale;
+
+    Decimal18Converter(TypeDescription schema) {
+      this.scale = schema.getScale();
+    }
+
+    @Override
+    public Class<BigDecimal> getJavaClass() {
+      return BigDecimal.class;
+    }
+
+    @Override
+    public void addValue(int rowId, BigDecimal data, ColumnVector output) {
+      // TODO: validate precision and scale from schema
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((DecimalColumnVector) output).vector[rowId]
+            .setFromLongAndScale(data.unscaledValue().longValueExact(), scale);
+      }
+    }
+  }
+
+  private static class Decimal38Converter implements Converter<BigDecimal> {
+    private static final Converter<BigDecimal> INSTANCE = new Decimal38Converter();
+
+    @Override
+    public Class<BigDecimal> getJavaClass() {
+      return BigDecimal.class;
+    }
+
+    @Override
+    public void addValue(int rowId, BigDecimal data, ColumnVector output) {
+      // TODO: validate precision and scale from schema
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        ((DecimalColumnVector) output).vector[rowId].set(HiveDecimal.create(data, false));
+      }
+    }
+  }
+
+  public static class RecordConverter implements Converter<Record> {
+    private final List<Converter> converters;
+
+    RecordConverter(List<Converter> converters) {
+      this.converters = converters;
+    }
+
+    public List<Converter> converters() {
+      return converters;
+    }
+
+    @Override
+    public Class<Record> getJavaClass() {
+      return Record.class;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void addValue(int rowId, Record data, ColumnVector output) {
+      if (data == null) {
+        output.noNulls = false;
+        output.isNull[rowId] = true;
+      } else {
+        output.isNull[rowId] = false;
+        StructColumnVector cv = (StructColumnVector) output;
+        for (int c = 0; c < converters.size(); ++c) {
+          converters.get(c).addValue(rowId, data.get(c, converters.get(c).getJavaClass()), cv.fields[c]);
+        }
+      }
+    }
+  }
+
+  public static class ListConverter implements Converter<List> {
+    private final Converter children;

Review comment:
       Nit: rename to `elementConverter`? Similar to what we do for `MapConverter`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org