You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/24 07:54:50 UTC

[GitHub] [iceberg] openinx opened a new pull request #1238: Refactor the SparkOrcWriter by using OrcSchemaWithTypeVisitor#visit

openinx opened a new pull request #1238:
URL: https://github.com/apache/iceberg/pull/1238


   This patch addressed the issues from https://github.com/apache/iceberg/issues/1236.  Once patch https://github.com/apache/iceberg/pull/1197  get merged, I will do a git-rebase and it will only remain the spark orc writer changes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1238: Refactor the SparkOrcWriter by using OrcSchemaWithTypeVisitor#visit

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1238:
URL: https://github.com/apache/iceberg/pull/1238#discussion_r466141100



##########
File path: spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
##########
@@ -20,399 +20,121 @@
 package org.apache.iceberg.spark.data;
 
 import java.util.List;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.orc.OrcRowWriter;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.storage.common.type.HiveDecimal;
-import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
 import org.apache.orc.storage.ql.exec.vector.ColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
-import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
-import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
 import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
-import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.MapData;
 
 /**
  * This class acts as an adaptor from an OrcFileAppender to a
  * FileAppender&lt;InternalRow&gt;.
  */
 public class SparkOrcWriter implements OrcRowWriter<InternalRow> {
 
-  private final Converter[] converters;
+  private final SparkOrcValueWriter writer;
 
-  public SparkOrcWriter(TypeDescription schema) {
-    converters = buildConverters(schema);
+  public SparkOrcWriter(Schema iSchema, TypeDescription orcSchema) {
+    Preconditions.checkArgument(orcSchema.getCategory() == TypeDescription.Category.STRUCT,
+        "Top level must be a struct " + orcSchema);
+
+    writer = OrcSchemaWithTypeVisitor.visit(iSchema, orcSchema, new WriteBuilder());
   }
 
   @Override
   public void write(InternalRow value, VectorizedRowBatch output) {
-    int row = output.size++;
-    for (int c = 0; c < converters.length; ++c) {
-      converters[c].addValue(row, c, value, output.cols[c]);
-    }
-  }
-
-  /**
-   * The interface for the conversion from Spark's SpecializedGetters to
-   * ORC's ColumnVectors.
-   */
-  interface Converter {
-    /**
-     * Take a value from the Spark data value and add it to the ORC output.
-     * @param rowId the row in the ColumnVector
-     * @param column either the column number or element number
-     * @param data either an InternalRow or ArrayData
-     * @param output the ColumnVector to put the value into
-     */
-    void addValue(int rowId, int column, SpecializedGetters data,
-                  ColumnVector output);
-  }
-
-  static class BooleanConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getBoolean(column) ? 1 : 0;
-      }
-    }
-  }
-
-  static class ByteConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getByte(column);
-      }
-    }
-  }
-
-  static class ShortConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getShort(column);
-      }
-    }
-  }
-
-  static class IntConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getInt(column);
-      }
-    }
-  }
-
-  static class LongConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getLong(column);
-      }
-    }
-  }
-
-  static class FloatConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DoubleColumnVector) output).vector[rowId] = data.getFloat(column);
-      }
-    }
-  }
-
-  static class DoubleConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DoubleColumnVector) output).vector[rowId] = data.getDouble(column);
-      }
-    }
-  }
-
-  static class StringConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        byte[] value = data.getUTF8String(column).getBytes();
-        ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
-      }
-    }
-  }
+    Preconditions.checkArgument(writer instanceof StructWriter, "writer must be StructWriter");
 
-  static class BytesConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        // getBinary always makes a copy, so we don't need to worry about it
-        // being changed behind our back.
-        byte[] value = data.getBinary(column);
-        ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
-      }
+    int row = output.size;
+    output.size += 1;
+    List<SparkOrcValueWriter> writers = ((StructWriter) writer).writers();
+    for (int c = 0; c < writers.size(); c++) {
+      SparkOrcValueWriter child = writers.get(c);
+      child.write(row, c, value, output.cols[c]);
     }
   }
 
-  static class TimestampTzConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        TimestampColumnVector cv = (TimestampColumnVector) output;
-        long micros = data.getLong(column);
-        cv.time[rowId] = micros / 1_000; // millis
-        cv.nanos[rowId] = (int) (micros % 1_000_000) * 1_000; // nanos
-      }
-    }
-  }
-
-  static class Decimal18Converter implements Converter {
-    private final int precision;
-    private final int scale;
-
-    Decimal18Converter(TypeDescription schema) {
-      precision = schema.getPrecision();
-      scale = schema.getScale();
+  private static class WriteBuilder extends OrcSchemaWithTypeVisitor<SparkOrcValueWriter> {
+    private WriteBuilder() {
     }
 
     @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DecimalColumnVector) output).vector[rowId].setFromLongAndScale(
-            data.getDecimal(column, precision, scale).toUnscaledLong(), scale);
-      }
-    }
-  }
-
-  static class Decimal38Converter implements Converter {
-    private final int precision;
-    private final int scale;
-
-    Decimal38Converter(TypeDescription schema) {
-      precision = schema.getPrecision();
-      scale = schema.getScale();
+    public SparkOrcValueWriter record(Types.StructType iStruct, TypeDescription record,
+                                      List<String> names, List<SparkOrcValueWriter> fields) {
+      return new StructWriter(fields);
     }
 
     @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DecimalColumnVector) output).vector[rowId].set(
-            HiveDecimal.create(data.getDecimal(column, precision, scale)
-                .toJavaBigDecimal()));
-      }
-    }
-  }
-
-  static class StructConverter implements Converter {
-    private final Converter[] children;
-
-    StructConverter(TypeDescription schema) {
-      children = new Converter[schema.getChildren().size()];
-      for (int c = 0; c < children.length; ++c) {
-        children[c] = buildConverter(schema.getChildren().get(c));
-      }
+    public SparkOrcValueWriter list(Types.ListType iList, TypeDescription array,
+                                    SparkOrcValueWriter element) {
+      return SparkOrcValueWriters.list(element);
     }
 
     @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        InternalRow value = data.getStruct(column, children.length);
-        StructColumnVector cv = (StructColumnVector) output;
-        for (int c = 0; c < children.length; ++c) {
-          children[c].addValue(rowId, c, value, cv.fields[c]);
-        }
-      }
-    }
-  }
-
-  static class ListConverter implements Converter {
-    private final Converter children;
-
-    ListConverter(TypeDescription schema) {
-      children = buildConverter(schema.getChildren().get(0));
+    public SparkOrcValueWriter map(Types.MapType iMap, TypeDescription map,
+                                   SparkOrcValueWriter key, SparkOrcValueWriter value) {
+      return SparkOrcValueWriters.map(key, value);
     }
 
     @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ArrayData value = data.getArray(column);
-        ListColumnVector cv = (ListColumnVector) output;
-        // record the length and start of the list elements
-        cv.lengths[rowId] = value.numElements();
-        cv.offsets[rowId] = cv.childCount;
-        cv.childCount += cv.lengths[rowId];
-        // make sure the child is big enough
-        cv.child.ensureSize(cv.childCount, true);
-        // Add each element
-        for (int e = 0; e < cv.lengths[rowId]; ++e) {
-          children.addValue((int) (e + cv.offsets[rowId]), e, value, cv.child);
-        }
-      }
-    }
-  }
-
-  static class MapConverter implements Converter {
-    private final Converter keyConverter;
-    private final Converter valueConverter;
-
-    MapConverter(TypeDescription schema) {
-      keyConverter = buildConverter(schema.getChildren().get(0));
-      valueConverter = buildConverter(schema.getChildren().get(1));
+    public SparkOrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) {
+      switch (primitive.getCategory()) {
+        case BOOLEAN:
+          return SparkOrcValueWriters.booleans();
+        case BYTE:
+          return SparkOrcValueWriters.bytes();
+        case SHORT:
+          return SparkOrcValueWriters.shorts();
+        case DATE:
+        case INT:
+          return SparkOrcValueWriters.ints();
+        case LONG:
+          return SparkOrcValueWriters.longs();
+        case FLOAT:
+          return SparkOrcValueWriters.floats();
+        case DOUBLE:
+          return SparkOrcValueWriters.doubles();
+        case BINARY:
+          return SparkOrcValueWriters.byteArrays();
+        case STRING:
+        case CHAR:
+        case VARCHAR:
+          return SparkOrcValueWriters.strings();
+        case DECIMAL:
+          return SparkOrcValueWriters.decimal(primitive.getPrecision(), primitive.getScale());
+        case TIMESTAMP_INSTANT:

Review comment:
       > So in this branch of the switch, the type is definitely timestamptz.
   
   Yes,  in this `TIMESAMP_INSTANT` case, it must be a timestamp with time zone. see the Category in TypeDescription: 
   
   ```java
   public enum Category {
       BOOLEAN("boolean", true),
       BYTE("tinyint", true),
       SHORT("smallint", true),
       INT("int", true),
       LONG("bigint", true),
       FLOAT("float", true),
       DOUBLE("double", true),
       STRING("string", true),
       DATE("date", true),
       TIMESTAMP("timestamp", true),
       BINARY("binary", true),
       DECIMAL("decimal", true),
       VARCHAR("varchar", true),
       CHAR("char", true),
       LIST("array", false),
       MAP("map", false),
       STRUCT("struct", false),
       UNION("uniontype", false),
       TIMESTAMP_INSTANT("timestamp with local time zone", false);
   ```
   
   btw, Is there any reason that we spark don't support a timestamp without time zone ?   I saw the spark test [here](https://github.com/apache/iceberg/blob/ffdcf09027e09460b7d7505e65aea119107934a3/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java#L52) did not include the `timestamp without zone` data type.  Maybe we could file another issue to address this `timestamp without zone` issue, because this is a refactor PR and we could get this merged firstly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1238: Refactor the SparkOrcWriter by using OrcSchemaWithTypeVisitor#visit

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1238:
URL: https://github.com/apache/iceberg/pull/1238#discussion_r466152224



##########
File path: spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
##########
@@ -20,399 +20,121 @@
 package org.apache.iceberg.spark.data;
 
 import java.util.List;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.orc.OrcRowWriter;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.storage.common.type.HiveDecimal;
-import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
 import org.apache.orc.storage.ql.exec.vector.ColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
-import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
-import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
 import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
-import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.MapData;
 
 /**
  * This class acts as an adaptor from an OrcFileAppender to a
  * FileAppender&lt;InternalRow&gt;.
  */
 public class SparkOrcWriter implements OrcRowWriter<InternalRow> {
 
-  private final Converter[] converters;
+  private final SparkOrcValueWriter writer;
 
-  public SparkOrcWriter(TypeDescription schema) {
-    converters = buildConverters(schema);
+  public SparkOrcWriter(Schema iSchema, TypeDescription orcSchema) {
+    Preconditions.checkArgument(orcSchema.getCategory() == TypeDescription.Category.STRUCT,
+        "Top level must be a struct " + orcSchema);
+
+    writer = OrcSchemaWithTypeVisitor.visit(iSchema, orcSchema, new WriteBuilder());
   }
 
   @Override
   public void write(InternalRow value, VectorizedRowBatch output) {
-    int row = output.size++;
-    for (int c = 0; c < converters.length; ++c) {
-      converters[c].addValue(row, c, value, output.cols[c]);
-    }
-  }
-
-  /**
-   * The interface for the conversion from Spark's SpecializedGetters to
-   * ORC's ColumnVectors.
-   */
-  interface Converter {
-    /**
-     * Take a value from the Spark data value and add it to the ORC output.
-     * @param rowId the row in the ColumnVector
-     * @param column either the column number or element number
-     * @param data either an InternalRow or ArrayData
-     * @param output the ColumnVector to put the value into
-     */
-    void addValue(int rowId, int column, SpecializedGetters data,
-                  ColumnVector output);
-  }
-
-  static class BooleanConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getBoolean(column) ? 1 : 0;
-      }
-    }
-  }
-
-  static class ByteConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getByte(column);
-      }
-    }
-  }
-
-  static class ShortConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getShort(column);
-      }
-    }
-  }
-
-  static class IntConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getInt(column);
-      }
-    }
-  }
-
-  static class LongConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getLong(column);
-      }
-    }
-  }
-
-  static class FloatConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DoubleColumnVector) output).vector[rowId] = data.getFloat(column);
-      }
-    }
-  }
-
-  static class DoubleConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DoubleColumnVector) output).vector[rowId] = data.getDouble(column);
-      }
-    }
-  }
-
-  static class StringConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        byte[] value = data.getUTF8String(column).getBytes();
-        ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
-      }
-    }
-  }
+    Preconditions.checkArgument(writer instanceof StructWriter, "writer must be StructWriter");
 
-  static class BytesConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        // getBinary always makes a copy, so we don't need to worry about it
-        // being changed behind our back.
-        byte[] value = data.getBinary(column);
-        ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
-      }
+    int row = output.size;
+    output.size += 1;
+    List<SparkOrcValueWriter> writers = ((StructWriter) writer).writers();
+    for (int c = 0; c < writers.size(); c++) {
+      SparkOrcValueWriter child = writers.get(c);
+      child.write(row, c, value, output.cols[c]);
     }
   }
 
-  static class TimestampTzConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        TimestampColumnVector cv = (TimestampColumnVector) output;
-        long micros = data.getLong(column);
-        cv.time[rowId] = micros / 1_000; // millis
-        cv.nanos[rowId] = (int) (micros % 1_000_000) * 1_000; // nanos
-      }
-    }
-  }
-
-  static class Decimal18Converter implements Converter {
-    private final int precision;
-    private final int scale;
-
-    Decimal18Converter(TypeDescription schema) {
-      precision = schema.getPrecision();
-      scale = schema.getScale();
+  private static class WriteBuilder extends OrcSchemaWithTypeVisitor<SparkOrcValueWriter> {
+    private WriteBuilder() {
     }
 
     @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DecimalColumnVector) output).vector[rowId].setFromLongAndScale(
-            data.getDecimal(column, precision, scale).toUnscaledLong(), scale);
-      }
-    }
-  }
-
-  static class Decimal38Converter implements Converter {
-    private final int precision;
-    private final int scale;
-
-    Decimal38Converter(TypeDescription schema) {
-      precision = schema.getPrecision();
-      scale = schema.getScale();
+    public SparkOrcValueWriter record(Types.StructType iStruct, TypeDescription record,
+                                      List<String> names, List<SparkOrcValueWriter> fields) {
+      return new StructWriter(fields);
     }
 
     @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DecimalColumnVector) output).vector[rowId].set(
-            HiveDecimal.create(data.getDecimal(column, precision, scale)
-                .toJavaBigDecimal()));
-      }
-    }
-  }
-
-  static class StructConverter implements Converter {
-    private final Converter[] children;
-
-    StructConverter(TypeDescription schema) {
-      children = new Converter[schema.getChildren().size()];
-      for (int c = 0; c < children.length; ++c) {
-        children[c] = buildConverter(schema.getChildren().get(c));
-      }
+    public SparkOrcValueWriter list(Types.ListType iList, TypeDescription array,
+                                    SparkOrcValueWriter element) {
+      return SparkOrcValueWriters.list(element);
     }
 
     @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        InternalRow value = data.getStruct(column, children.length);
-        StructColumnVector cv = (StructColumnVector) output;
-        for (int c = 0; c < children.length; ++c) {
-          children[c].addValue(rowId, c, value, cv.fields[c]);
-        }
-      }
-    }
-  }
-
-  static class ListConverter implements Converter {
-    private final Converter children;
-
-    ListConverter(TypeDescription schema) {
-      children = buildConverter(schema.getChildren().get(0));
+    public SparkOrcValueWriter map(Types.MapType iMap, TypeDescription map,
+                                   SparkOrcValueWriter key, SparkOrcValueWriter value) {
+      return SparkOrcValueWriters.map(key, value);
     }
 
     @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ArrayData value = data.getArray(column);
-        ListColumnVector cv = (ListColumnVector) output;
-        // record the length and start of the list elements
-        cv.lengths[rowId] = value.numElements();
-        cv.offsets[rowId] = cv.childCount;
-        cv.childCount += cv.lengths[rowId];
-        // make sure the child is big enough
-        cv.child.ensureSize(cv.childCount, true);
-        // Add each element
-        for (int e = 0; e < cv.lengths[rowId]; ++e) {
-          children.addValue((int) (e + cv.offsets[rowId]), e, value, cv.child);
-        }
-      }
-    }
-  }
-
-  static class MapConverter implements Converter {
-    private final Converter keyConverter;
-    private final Converter valueConverter;
-
-    MapConverter(TypeDescription schema) {
-      keyConverter = buildConverter(schema.getChildren().get(0));
-      valueConverter = buildConverter(schema.getChildren().get(1));
+    public SparkOrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) {
+      switch (primitive.getCategory()) {
+        case BOOLEAN:
+          return SparkOrcValueWriters.booleans();
+        case BYTE:
+          return SparkOrcValueWriters.bytes();
+        case SHORT:
+          return SparkOrcValueWriters.shorts();
+        case DATE:
+        case INT:
+          return SparkOrcValueWriters.ints();
+        case LONG:
+          return SparkOrcValueWriters.longs();
+        case FLOAT:
+          return SparkOrcValueWriters.floats();
+        case DOUBLE:
+          return SparkOrcValueWriters.doubles();
+        case BINARY:
+          return SparkOrcValueWriters.byteArrays();
+        case STRING:
+        case CHAR:
+        case VARCHAR:
+          return SparkOrcValueWriters.strings();
+        case DECIMAL:
+          return SparkOrcValueWriters.decimal(primitive.getPrecision(), primitive.getScale());
+        case TIMESTAMP_INSTANT:

Review comment:
       I was thinking that we should also handle `TIMESTAMP` and not just `TIMESTAMP_INSTANT` . But I checked with @shardulm94  and the reason is that Spark does not support timestamp without timezone.
   
   My other point was to dispatch on primitive so that it is consistent with other writers. e.g GenericOrcWriter.  But here it doesn't make a lot of difference.
   
   Thanks for the explanation! 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on pull request #1238: Refactor the SparkOrcWriter by using OrcSchemaWithTypeVisitor#visit

Posted by GitBox <gi...@apache.org>.
rdsr commented on pull request #1238:
URL: https://github.com/apache/iceberg/pull/1238#issuecomment-669328584


   Thanks @openinx . Looking into it today


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1238: Refactor the SparkOrcWriter by using OrcSchemaWithTypeVisitor#visit

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1238:
URL: https://github.com/apache/iceberg/pull/1238#issuecomment-666272344


   Rebase to fix the conflicts, and trigger the travis test. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1238: Refactor the SparkOrcWriter by using OrcSchemaWithTypeVisitor#visit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1238:
URL: https://github.com/apache/iceberg/pull/1238#discussion_r463867737



##########
File path: spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data;
+
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
+import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+
+class SparkOrcValueWriters {
+  private SparkOrcValueWriters() {
+  }
+
+  public static SparkOrcValueWriter booleans() {
+    return BooleanWriter.INSTANCE;
+  }
+
+  public static SparkOrcValueWriter bytes() {
+    return ByteWriter.INSTANCE;
+  }
+
+  public static SparkOrcValueWriter shorts() {
+    return ShortWriter.INSTANCE;
+  }
+
+  public static SparkOrcValueWriter ints() {
+    return IntWriter.INSTANCE;
+  }
+
+  public static SparkOrcValueWriter longs() {
+    return LongWriter.INSTANCE;
+  }
+
+  public static SparkOrcValueWriter floats() {
+    return FloatWriter.INSTANCE;
+  }
+
+  public static SparkOrcValueWriter doubles() {
+    return DoubleWriter.INSTANCE;
+  }
+
+  public static SparkOrcValueWriter byteArray() {

Review comment:
       Nit: this is the only one that isn't plural, `byteArrays`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1238: Refactor the SparkOrcWriter by using OrcSchemaWithTypeVisitor#visit

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1238:
URL: https://github.com/apache/iceberg/pull/1238#discussion_r465907382



##########
File path: spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
##########
@@ -20,399 +20,121 @@
 package org.apache.iceberg.spark.data;
 
 import java.util.List;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.orc.OrcRowWriter;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.storage.common.type.HiveDecimal;
-import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
 import org.apache.orc.storage.ql.exec.vector.ColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
-import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
-import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
 import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
-import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.MapData;
 
 /**
  * This class acts as an adaptor from an OrcFileAppender to a
  * FileAppender&lt;InternalRow&gt;.
  */
 public class SparkOrcWriter implements OrcRowWriter<InternalRow> {
 
-  private final Converter[] converters;
+  private final SparkOrcValueWriter writer;
 
-  public SparkOrcWriter(TypeDescription schema) {
-    converters = buildConverters(schema);
+  public SparkOrcWriter(Schema iSchema, TypeDescription orcSchema) {
+    Preconditions.checkArgument(orcSchema.getCategory() == TypeDescription.Category.STRUCT,
+        "Top level must be a struct " + orcSchema);
+
+    writer = OrcSchemaWithTypeVisitor.visit(iSchema, orcSchema, new WriteBuilder());
   }
 
   @Override
   public void write(InternalRow value, VectorizedRowBatch output) {
-    int row = output.size++;
-    for (int c = 0; c < converters.length; ++c) {
-      converters[c].addValue(row, c, value, output.cols[c]);
-    }
-  }
-
-  /**
-   * The interface for the conversion from Spark's SpecializedGetters to
-   * ORC's ColumnVectors.
-   */
-  interface Converter {
-    /**
-     * Take a value from the Spark data value and add it to the ORC output.
-     * @param rowId the row in the ColumnVector
-     * @param column either the column number or element number
-     * @param data either an InternalRow or ArrayData
-     * @param output the ColumnVector to put the value into
-     */
-    void addValue(int rowId, int column, SpecializedGetters data,
-                  ColumnVector output);
-  }
-
-  static class BooleanConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getBoolean(column) ? 1 : 0;
-      }
-    }
-  }
-
-  static class ByteConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getByte(column);
-      }
-    }
-  }
-
-  static class ShortConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getShort(column);
-      }
-    }
-  }
-
-  static class IntConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getInt(column);
-      }
-    }
-  }
-
-  static class LongConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getLong(column);
-      }
-    }
-  }
-
-  static class FloatConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DoubleColumnVector) output).vector[rowId] = data.getFloat(column);
-      }
-    }
-  }
-
-  static class DoubleConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DoubleColumnVector) output).vector[rowId] = data.getDouble(column);
-      }
-    }
-  }
-
-  static class StringConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        byte[] value = data.getUTF8String(column).getBytes();
-        ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
-      }
-    }
-  }
+    Preconditions.checkArgument(writer instanceof StructWriter, "writer must be StructWriter");
 
-  static class BytesConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        // getBinary always makes a copy, so we don't need to worry about it
-        // being changed behind our back.
-        byte[] value = data.getBinary(column);
-        ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
-      }
+    int row = output.size;
+    output.size += 1;
+    List<SparkOrcValueWriter> writers = ((StructWriter) writer).writers();
+    for (int c = 0; c < writers.size(); c++) {
+      SparkOrcValueWriter child = writers.get(c);
+      child.write(row, c, value, output.cols[c]);
     }
   }
 
-  static class TimestampTzConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        TimestampColumnVector cv = (TimestampColumnVector) output;
-        long micros = data.getLong(column);
-        cv.time[rowId] = micros / 1_000; // millis
-        cv.nanos[rowId] = (int) (micros % 1_000_000) * 1_000; // nanos
-      }
-    }
-  }
-
-  static class Decimal18Converter implements Converter {
-    private final int precision;
-    private final int scale;
-
-    Decimal18Converter(TypeDescription schema) {
-      precision = schema.getPrecision();
-      scale = schema.getScale();
+  private static class WriteBuilder extends OrcSchemaWithTypeVisitor<SparkOrcValueWriter> {
+    private WriteBuilder() {
     }
 
     @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DecimalColumnVector) output).vector[rowId].setFromLongAndScale(
-            data.getDecimal(column, precision, scale).toUnscaledLong(), scale);
-      }
-    }
-  }
-
-  static class Decimal38Converter implements Converter {
-    private final int precision;
-    private final int scale;
-
-    Decimal38Converter(TypeDescription schema) {
-      precision = schema.getPrecision();
-      scale = schema.getScale();
+    public SparkOrcValueWriter record(Types.StructType iStruct, TypeDescription record,
+                                      List<String> names, List<SparkOrcValueWriter> fields) {
+      return new StructWriter(fields);
     }
 
     @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DecimalColumnVector) output).vector[rowId].set(
-            HiveDecimal.create(data.getDecimal(column, precision, scale)
-                .toJavaBigDecimal()));
-      }
-    }
-  }
-
-  static class StructConverter implements Converter {
-    private final Converter[] children;
-
-    StructConverter(TypeDescription schema) {
-      children = new Converter[schema.getChildren().size()];
-      for (int c = 0; c < children.length; ++c) {
-        children[c] = buildConverter(schema.getChildren().get(c));
-      }
+    public SparkOrcValueWriter list(Types.ListType iList, TypeDescription array,
+                                    SparkOrcValueWriter element) {
+      return SparkOrcValueWriters.list(element);
     }
 
     @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        InternalRow value = data.getStruct(column, children.length);
-        StructColumnVector cv = (StructColumnVector) output;
-        for (int c = 0; c < children.length; ++c) {
-          children[c].addValue(rowId, c, value, cv.fields[c]);
-        }
-      }
-    }
-  }
-
-  static class ListConverter implements Converter {
-    private final Converter children;
-
-    ListConverter(TypeDescription schema) {
-      children = buildConverter(schema.getChildren().get(0));
+    public SparkOrcValueWriter map(Types.MapType iMap, TypeDescription map,
+                                   SparkOrcValueWriter key, SparkOrcValueWriter value) {
+      return SparkOrcValueWriters.map(key, value);
     }
 
     @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ArrayData value = data.getArray(column);
-        ListColumnVector cv = (ListColumnVector) output;
-        // record the length and start of the list elements
-        cv.lengths[rowId] = value.numElements();
-        cv.offsets[rowId] = cv.childCount;
-        cv.childCount += cv.lengths[rowId];
-        // make sure the child is big enough
-        cv.child.ensureSize(cv.childCount, true);
-        // Add each element
-        for (int e = 0; e < cv.lengths[rowId]; ++e) {
-          children.addValue((int) (e + cv.offsets[rowId]), e, value, cv.child);
-        }
-      }
-    }
-  }
-
-  static class MapConverter implements Converter {
-    private final Converter keyConverter;
-    private final Converter valueConverter;
-
-    MapConverter(TypeDescription schema) {
-      keyConverter = buildConverter(schema.getChildren().get(0));
-      valueConverter = buildConverter(schema.getChildren().get(1));
+    public SparkOrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) {
+      switch (primitive.getCategory()) {
+        case BOOLEAN:
+          return SparkOrcValueWriters.booleans();
+        case BYTE:
+          return SparkOrcValueWriters.bytes();
+        case SHORT:
+          return SparkOrcValueWriters.shorts();
+        case DATE:
+        case INT:
+          return SparkOrcValueWriters.ints();
+        case LONG:
+          return SparkOrcValueWriters.longs();
+        case FLOAT:
+          return SparkOrcValueWriters.floats();
+        case DOUBLE:
+          return SparkOrcValueWriters.doubles();
+        case BINARY:
+          return SparkOrcValueWriters.byteArrays();
+        case STRING:
+        case CHAR:
+        case VARCHAR:
+          return SparkOrcValueWriters.strings();
+        case DECIMAL:
+          return SparkOrcValueWriters.decimal(primitive.getPrecision(), primitive.getScale());
+        case TIMESTAMP_INSTANT:

Review comment:
       I think we need to take into account if we need to adjust for UTC. This information is present in `iPrimitive`.  I think we should dispatch on the `iprimitive type` instead of the ORC primitive type, no?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #1238: Refactor the SparkOrcWriter by using OrcSchemaWithTypeVisitor#visit

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #1238:
URL: https://github.com/apache/iceberg/pull/1238#issuecomment-668483363


   Ping @rdsr @shardulm94 for reviewing.  Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr merged pull request #1238: Refactor the SparkOrcWriter by using OrcSchemaWithTypeVisitor#visit

Posted by GitBox <gi...@apache.org>.
rdsr merged pull request #1238:
URL: https://github.com/apache/iceberg/pull/1238


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1238: Refactor the SparkOrcWriter by using OrcSchemaWithTypeVisitor#visit

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1238:
URL: https://github.com/apache/iceberg/pull/1238#issuecomment-667407729


   +1
   
   I think this looks good, but @rdsr or @shardulm94 may want to take a look also.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1238: Refactor the SparkOrcWriter by using OrcSchemaWithTypeVisitor#visit

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1238:
URL: https://github.com/apache/iceberg/pull/1238#discussion_r465998582



##########
File path: spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
##########
@@ -20,399 +20,121 @@
 package org.apache.iceberg.spark.data;
 
 import java.util.List;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.orc.OrcRowWriter;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.storage.common.type.HiveDecimal;
-import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
 import org.apache.orc.storage.ql.exec.vector.ColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
-import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
-import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
 import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
-import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.MapData;
 
 /**
  * This class acts as an adaptor from an OrcFileAppender to a
  * FileAppender&lt;InternalRow&gt;.
  */
 public class SparkOrcWriter implements OrcRowWriter<InternalRow> {
 
-  private final Converter[] converters;
+  private final SparkOrcValueWriter writer;
 
-  public SparkOrcWriter(TypeDescription schema) {
-    converters = buildConverters(schema);
+  public SparkOrcWriter(Schema iSchema, TypeDescription orcSchema) {
+    Preconditions.checkArgument(orcSchema.getCategory() == TypeDescription.Category.STRUCT,
+        "Top level must be a struct " + orcSchema);
+
+    writer = OrcSchemaWithTypeVisitor.visit(iSchema, orcSchema, new WriteBuilder());
   }
 
   @Override
   public void write(InternalRow value, VectorizedRowBatch output) {
-    int row = output.size++;
-    for (int c = 0; c < converters.length; ++c) {
-      converters[c].addValue(row, c, value, output.cols[c]);
-    }
-  }
-
-  /**
-   * The interface for the conversion from Spark's SpecializedGetters to
-   * ORC's ColumnVectors.
-   */
-  interface Converter {
-    /**
-     * Take a value from the Spark data value and add it to the ORC output.
-     * @param rowId the row in the ColumnVector
-     * @param column either the column number or element number
-     * @param data either an InternalRow or ArrayData
-     * @param output the ColumnVector to put the value into
-     */
-    void addValue(int rowId, int column, SpecializedGetters data,
-                  ColumnVector output);
-  }
-
-  static class BooleanConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getBoolean(column) ? 1 : 0;
-      }
-    }
-  }
-
-  static class ByteConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getByte(column);
-      }
-    }
-  }
-
-  static class ShortConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getShort(column);
-      }
-    }
-  }
-
-  static class IntConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getInt(column);
-      }
-    }
-  }
-
-  static class LongConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getLong(column);
-      }
-    }
-  }
-
-  static class FloatConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DoubleColumnVector) output).vector[rowId] = data.getFloat(column);
-      }
-    }
-  }
-
-  static class DoubleConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DoubleColumnVector) output).vector[rowId] = data.getDouble(column);
-      }
-    }
-  }
-
-  static class StringConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        byte[] value = data.getUTF8String(column).getBytes();
-        ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
-      }
-    }
-  }
+    Preconditions.checkArgument(writer instanceof StructWriter, "writer must be StructWriter");
 
-  static class BytesConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        // getBinary always makes a copy, so we don't need to worry about it
-        // being changed behind our back.
-        byte[] value = data.getBinary(column);
-        ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
-      }
+    int row = output.size;
+    output.size += 1;
+    List<SparkOrcValueWriter> writers = ((StructWriter) writer).writers();
+    for (int c = 0; c < writers.size(); c++) {
+      SparkOrcValueWriter child = writers.get(c);
+      child.write(row, c, value, output.cols[c]);
     }
   }
 
-  static class TimestampTzConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        TimestampColumnVector cv = (TimestampColumnVector) output;
-        long micros = data.getLong(column);
-        cv.time[rowId] = micros / 1_000; // millis
-        cv.nanos[rowId] = (int) (micros % 1_000_000) * 1_000; // nanos
-      }
-    }
-  }
-
-  static class Decimal18Converter implements Converter {
-    private final int precision;
-    private final int scale;
-
-    Decimal18Converter(TypeDescription schema) {
-      precision = schema.getPrecision();
-      scale = schema.getScale();
+  private static class WriteBuilder extends OrcSchemaWithTypeVisitor<SparkOrcValueWriter> {
+    private WriteBuilder() {
     }
 
     @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DecimalColumnVector) output).vector[rowId].setFromLongAndScale(
-            data.getDecimal(column, precision, scale).toUnscaledLong(), scale);
-      }
-    }
-  }
-
-  static class Decimal38Converter implements Converter {
-    private final int precision;
-    private final int scale;
-
-    Decimal38Converter(TypeDescription schema) {
-      precision = schema.getPrecision();
-      scale = schema.getScale();
+    public SparkOrcValueWriter record(Types.StructType iStruct, TypeDescription record,
+                                      List<String> names, List<SparkOrcValueWriter> fields) {
+      return new StructWriter(fields);
     }
 
     @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DecimalColumnVector) output).vector[rowId].set(
-            HiveDecimal.create(data.getDecimal(column, precision, scale)
-                .toJavaBigDecimal()));
-      }
-    }
-  }
-
-  static class StructConverter implements Converter {
-    private final Converter[] children;
-
-    StructConverter(TypeDescription schema) {
-      children = new Converter[schema.getChildren().size()];
-      for (int c = 0; c < children.length; ++c) {
-        children[c] = buildConverter(schema.getChildren().get(c));
-      }
+    public SparkOrcValueWriter list(Types.ListType iList, TypeDescription array,
+                                    SparkOrcValueWriter element) {
+      return SparkOrcValueWriters.list(element);
     }
 
     @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        InternalRow value = data.getStruct(column, children.length);
-        StructColumnVector cv = (StructColumnVector) output;
-        for (int c = 0; c < children.length; ++c) {
-          children[c].addValue(rowId, c, value, cv.fields[c]);
-        }
-      }
-    }
-  }
-
-  static class ListConverter implements Converter {
-    private final Converter children;
-
-    ListConverter(TypeDescription schema) {
-      children = buildConverter(schema.getChildren().get(0));
+    public SparkOrcValueWriter map(Types.MapType iMap, TypeDescription map,
+                                   SparkOrcValueWriter key, SparkOrcValueWriter value) {
+      return SparkOrcValueWriters.map(key, value);
     }
 
     @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ArrayData value = data.getArray(column);
-        ListColumnVector cv = (ListColumnVector) output;
-        // record the length and start of the list elements
-        cv.lengths[rowId] = value.numElements();
-        cv.offsets[rowId] = cv.childCount;
-        cv.childCount += cv.lengths[rowId];
-        // make sure the child is big enough
-        cv.child.ensureSize(cv.childCount, true);
-        // Add each element
-        for (int e = 0; e < cv.lengths[rowId]; ++e) {
-          children.addValue((int) (e + cv.offsets[rowId]), e, value, cv.child);
-        }
-      }
-    }
-  }
-
-  static class MapConverter implements Converter {
-    private final Converter keyConverter;
-    private final Converter valueConverter;
-
-    MapConverter(TypeDescription schema) {
-      keyConverter = buildConverter(schema.getChildren().get(0));
-      valueConverter = buildConverter(schema.getChildren().get(1));
+    public SparkOrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) {
+      switch (primitive.getCategory()) {
+        case BOOLEAN:
+          return SparkOrcValueWriters.booleans();
+        case BYTE:
+          return SparkOrcValueWriters.bytes();
+        case SHORT:
+          return SparkOrcValueWriters.shorts();
+        case DATE:
+        case INT:
+          return SparkOrcValueWriters.ints();
+        case LONG:
+          return SparkOrcValueWriters.longs();
+        case FLOAT:
+          return SparkOrcValueWriters.floats();
+        case DOUBLE:
+          return SparkOrcValueWriters.doubles();
+        case BINARY:
+          return SparkOrcValueWriters.byteArrays();
+        case STRING:
+        case CHAR:
+        case VARCHAR:
+          return SparkOrcValueWriters.strings();
+        case DECIMAL:
+          return SparkOrcValueWriters.decimal(primitive.getPrecision(), primitive.getScale());
+        case TIMESTAMP_INSTANT:

Review comment:
       I'm not quite sure what you mean here.
   
   Iceberg must never modify a value that is passed in, other than to convert to a different representation. Adjusting for time zones would produce corrupt values.
   
   It sounds like you're talking about the `adjust-to-utc` property that we set for Avro and Parquet, which tells us whether a timestamp is `with time zone` (adjust=true) or `without time zone` (adjust=false). But those only determine the type. Values are never modified because of that type. In this case, we don't need to do that because TIMESTAMP_INSTANT is used for timestamp with time zone, and TIMESTAMP is used for timestamp without time zone. So in this branch of the switch, the type is definitely timestamptz.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org