You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/08/05 18:02:29 UTC

[GitHub] [iceberg] rdsr commented on a change in pull request #1238: Refactor the SparkOrcWriter by using OrcSchemaWithTypeVisitor#visit

rdsr commented on a change in pull request #1238:
URL: https://github.com/apache/iceberg/pull/1238#discussion_r465907382



##########
File path: spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java
##########
@@ -20,399 +20,121 @@
 package org.apache.iceberg.spark.data;
 
 import java.util.List;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.orc.OrcRowWriter;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.storage.common.type.HiveDecimal;
-import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
 import org.apache.orc.storage.ql.exec.vector.ColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
-import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
-import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
 import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
-import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
 import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.MapData;
 
 /**
  * This class acts as an adaptor from an OrcFileAppender to a
  * FileAppender&lt;InternalRow&gt;.
  */
 public class SparkOrcWriter implements OrcRowWriter<InternalRow> {
 
-  private final Converter[] converters;
+  private final SparkOrcValueWriter writer;
 
-  public SparkOrcWriter(TypeDescription schema) {
-    converters = buildConverters(schema);
+  public SparkOrcWriter(Schema iSchema, TypeDescription orcSchema) {
+    Preconditions.checkArgument(orcSchema.getCategory() == TypeDescription.Category.STRUCT,
+        "Top level must be a struct " + orcSchema);
+
+    writer = OrcSchemaWithTypeVisitor.visit(iSchema, orcSchema, new WriteBuilder());
   }
 
   @Override
   public void write(InternalRow value, VectorizedRowBatch output) {
-    int row = output.size++;
-    for (int c = 0; c < converters.length; ++c) {
-      converters[c].addValue(row, c, value, output.cols[c]);
-    }
-  }
-
-  /**
-   * The interface for the conversion from Spark's SpecializedGetters to
-   * ORC's ColumnVectors.
-   */
-  interface Converter {
-    /**
-     * Take a value from the Spark data value and add it to the ORC output.
-     * @param rowId the row in the ColumnVector
-     * @param column either the column number or element number
-     * @param data either an InternalRow or ArrayData
-     * @param output the ColumnVector to put the value into
-     */
-    void addValue(int rowId, int column, SpecializedGetters data,
-                  ColumnVector output);
-  }
-
-  static class BooleanConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getBoolean(column) ? 1 : 0;
-      }
-    }
-  }
-
-  static class ByteConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getByte(column);
-      }
-    }
-  }
-
-  static class ShortConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getShort(column);
-      }
-    }
-  }
-
-  static class IntConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getInt(column);
-      }
-    }
-  }
-
-  static class LongConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getLong(column);
-      }
-    }
-  }
-
-  static class FloatConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DoubleColumnVector) output).vector[rowId] = data.getFloat(column);
-      }
-    }
-  }
-
-  static class DoubleConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DoubleColumnVector) output).vector[rowId] = data.getDouble(column);
-      }
-    }
-  }
-
-  static class StringConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        byte[] value = data.getUTF8String(column).getBytes();
-        ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
-      }
-    }
-  }
+    Preconditions.checkArgument(writer instanceof StructWriter, "writer must be StructWriter");
 
-  static class BytesConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        // getBinary always makes a copy, so we don't need to worry about it
-        // being changed behind our back.
-        byte[] value = data.getBinary(column);
-        ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
-      }
+    int row = output.size;
+    output.size += 1;
+    List<SparkOrcValueWriter> writers = ((StructWriter) writer).writers();
+    for (int c = 0; c < writers.size(); c++) {
+      SparkOrcValueWriter child = writers.get(c);
+      child.write(row, c, value, output.cols[c]);
     }
   }
 
-  static class TimestampTzConverter implements Converter {
-    @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        TimestampColumnVector cv = (TimestampColumnVector) output;
-        long micros = data.getLong(column);
-        cv.time[rowId] = micros / 1_000; // millis
-        cv.nanos[rowId] = (int) (micros % 1_000_000) * 1_000; // nanos
-      }
-    }
-  }
-
-  static class Decimal18Converter implements Converter {
-    private final int precision;
-    private final int scale;
-
-    Decimal18Converter(TypeDescription schema) {
-      precision = schema.getPrecision();
-      scale = schema.getScale();
+  private static class WriteBuilder extends OrcSchemaWithTypeVisitor<SparkOrcValueWriter> {
+    private WriteBuilder() {
     }
 
     @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DecimalColumnVector) output).vector[rowId].setFromLongAndScale(
-            data.getDecimal(column, precision, scale).toUnscaledLong(), scale);
-      }
-    }
-  }
-
-  static class Decimal38Converter implements Converter {
-    private final int precision;
-    private final int scale;
-
-    Decimal38Converter(TypeDescription schema) {
-      precision = schema.getPrecision();
-      scale = schema.getScale();
+    public SparkOrcValueWriter record(Types.StructType iStruct, TypeDescription record,
+                                      List<String> names, List<SparkOrcValueWriter> fields) {
+      return new StructWriter(fields);
     }
 
     @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DecimalColumnVector) output).vector[rowId].set(
-            HiveDecimal.create(data.getDecimal(column, precision, scale)
-                .toJavaBigDecimal()));
-      }
-    }
-  }
-
-  static class StructConverter implements Converter {
-    private final Converter[] children;
-
-    StructConverter(TypeDescription schema) {
-      children = new Converter[schema.getChildren().size()];
-      for (int c = 0; c < children.length; ++c) {
-        children[c] = buildConverter(schema.getChildren().get(c));
-      }
+    public SparkOrcValueWriter list(Types.ListType iList, TypeDescription array,
+                                    SparkOrcValueWriter element) {
+      return SparkOrcValueWriters.list(element);
     }
 
     @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        InternalRow value = data.getStruct(column, children.length);
-        StructColumnVector cv = (StructColumnVector) output;
-        for (int c = 0; c < children.length; ++c) {
-          children[c].addValue(rowId, c, value, cv.fields[c]);
-        }
-      }
-    }
-  }
-
-  static class ListConverter implements Converter {
-    private final Converter children;
-
-    ListConverter(TypeDescription schema) {
-      children = buildConverter(schema.getChildren().get(0));
+    public SparkOrcValueWriter map(Types.MapType iMap, TypeDescription map,
+                                   SparkOrcValueWriter key, SparkOrcValueWriter value) {
+      return SparkOrcValueWriters.map(key, value);
     }
 
     @Override
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ArrayData value = data.getArray(column);
-        ListColumnVector cv = (ListColumnVector) output;
-        // record the length and start of the list elements
-        cv.lengths[rowId] = value.numElements();
-        cv.offsets[rowId] = cv.childCount;
-        cv.childCount += cv.lengths[rowId];
-        // make sure the child is big enough
-        cv.child.ensureSize(cv.childCount, true);
-        // Add each element
-        for (int e = 0; e < cv.lengths[rowId]; ++e) {
-          children.addValue((int) (e + cv.offsets[rowId]), e, value, cv.child);
-        }
-      }
-    }
-  }
-
-  static class MapConverter implements Converter {
-    private final Converter keyConverter;
-    private final Converter valueConverter;
-
-    MapConverter(TypeDescription schema) {
-      keyConverter = buildConverter(schema.getChildren().get(0));
-      valueConverter = buildConverter(schema.getChildren().get(1));
+    public SparkOrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) {
+      switch (primitive.getCategory()) {
+        case BOOLEAN:
+          return SparkOrcValueWriters.booleans();
+        case BYTE:
+          return SparkOrcValueWriters.bytes();
+        case SHORT:
+          return SparkOrcValueWriters.shorts();
+        case DATE:
+        case INT:
+          return SparkOrcValueWriters.ints();
+        case LONG:
+          return SparkOrcValueWriters.longs();
+        case FLOAT:
+          return SparkOrcValueWriters.floats();
+        case DOUBLE:
+          return SparkOrcValueWriters.doubles();
+        case BINARY:
+          return SparkOrcValueWriters.byteArrays();
+        case STRING:
+        case CHAR:
+        case VARCHAR:
+          return SparkOrcValueWriters.strings();
+        case DECIMAL:
+          return SparkOrcValueWriters.decimal(primitive.getPrecision(), primitive.getScale());
+        case TIMESTAMP_INSTANT:

Review comment:
       I think we need to take into account if we need to adjust for UTC. This information is present in `iPrimitive`.  I think we should dispatch on the `iprimitive type` instead of the ORC primitive type, no?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org