You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/06/10 20:48:51 UTC

[incubator-iceberg] branch master updated: ORC: Make converters reusable (#195) (#209)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 864d790  ORC: Make converters reusable (#195) (#209)
864d790 is described below

commit 864d79041f09d44fa1e50a7aa3237344a2880e5a
Author: Ratandeep Ratti <rr...@linkedin.com>
AuthorDate: Mon Jun 10 13:48:47 2019 -0700

    ORC: Make converters reusable (#195) (#209)
---
 .../apache/iceberg/spark/data/SparkOrcReader.java  | 74 +++++++++++-----------
 1 file changed, 36 insertions(+), 38 deletions(-)

diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
index 301a550..aa0397d 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
@@ -60,16 +60,18 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
   private final static int INITIAL_SIZE = 128 * 1024;
   private final int numFields;
   private final TypeDescription readSchema;
+  private final Converter[] converters;
 
   public SparkOrcReader(Schema readSchema) {
     this.readSchema = TypeConversion.toOrc(readSchema, new ColumnIdMap());
     numFields = readSchema.columns().size();
+    converters = buildConverters();
   }
 
-  private Converter[] buildConverters(final UnsafeRowWriter writer) {
+  private Converter[] buildConverters() {
     final Converter[] converters = new Converter[numFields];
     for(int c = 0; c < numFields; ++c) {
-      converters[c] = buildConverter(writer, readSchema.getChildren().get(c));
+      converters[c] = buildConverter(readSchema.getChildren().get(c));
     }
     return converters;
   }
@@ -77,8 +79,6 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
   @Override
   public InternalRow read(VectorizedRowBatch batch, int row) {
     final UnsafeRowWriter rowWriter = new UnsafeRowWriter(numFields, INITIAL_SIZE);
-    final Converter[] converters = buildConverters(rowWriter);
-
     rowWriter.reset();
     rowWriter.zeroOutNullBytes();
     for(int c=0; c < batch.cols.length; ++c) {
@@ -565,17 +565,17 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
 
   private static class StructConverter implements Converter {
     private final Converter[] children;
-    private final UnsafeRowWriter childWriter;
 
-    StructConverter(final UnsafeWriter parentWriter, final TypeDescription schema) {
+    StructConverter(final TypeDescription schema) {
       children = new Converter[schema.getChildren().size()];
       for(int c=0; c < children.length; ++c) {
-        children[c] = buildConverter(parentWriter, schema.getChildren().get(c));
+        children[c] = buildConverter(schema.getChildren().get(c));
       }
-      childWriter = new UnsafeRowWriter(parentWriter, children.length);
+
     }
 
-    int writeStruct(StructColumnVector vector, int row) {
+    int writeStruct(UnsafeWriter parentWriter, StructColumnVector vector, int row) {
+      UnsafeRowWriter childWriter = new UnsafeRowWriter(parentWriter, children.length);
       int start = childWriter.cursor();
       childWriter.resetRowWriter();
       for(int c=0; c < children.length; ++c) {
@@ -592,7 +592,7 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
       if (!vector.noNulls && vector.isNull[row]) {
         writer.setNullAt(column);
       } else {
-        int start = writeStruct((StructColumnVector) vector, row);
+        int start = writeStruct(writer, (StructColumnVector) vector, row);
         writer.setOffsetAndSizeFromPreviousCursor(column, start);
       }
     }
@@ -606,30 +606,30 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
       if (!vector.noNulls && vector.isNull[row]) {
         writer.setNull(element);
       } else {
-        int start = writeStruct((StructColumnVector) vector, row);
+        int start = writeStruct(writer, (StructColumnVector) vector, row);
         writer.setOffsetAndSizeFromPreviousCursor(element, start);
       }
     }
   }
 
   private static class ListConverter implements Converter {
-    private final Converter children;
-    private final UnsafeArrayWriter childWriter;
-
-    ListConverter(final UnsafeWriter parentWriter, TypeDescription schema) {
-      TypeDescription child = schema.getChildren().get(0);
-      children = buildConverter(parentWriter, child);
-      childWriter = new UnsafeArrayWriter(parentWriter, getArrayElementSize(child));
+    private final Converter childConverter;
+    private final TypeDescription child;
 
+    ListConverter(TypeDescription schema) {
+      child = schema.getChildren().get(0);
+      childConverter = buildConverter(child);
     }
 
-    int writeList(ListColumnVector v, int row) {
+    int writeList(UnsafeWriter parentWriter, ListColumnVector v, int row) {
       int offset = (int) v.offsets[row];
       int length = (int) v.lengths[row];
+
+      UnsafeArrayWriter childWriter = new UnsafeArrayWriter(parentWriter, getArrayElementSize(child));
       int start = childWriter.cursor();
       childWriter.initialize(length);
       for(int c = 0; c < length; ++c) {
-        children.convert(childWriter, c, v.child, offset + c);
+        childConverter.convert(childWriter, c, v.child, offset + c);
       }
       return start;
     }
@@ -643,7 +643,7 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
       if (!vector.noNulls && vector.isNull[row]) {
         writer.setNullAt(column);
       } else {
-        int start = writeList((ListColumnVector) vector, row);
+        int start = writeList(writer, (ListColumnVector) vector, row);
         writer.setOffsetAndSizeFromPreviousCursor(column, start);
       }
     }
@@ -657,7 +657,7 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
       if (!vector.noNulls && vector.isNull[row]) {
         writer.setNull(element);
       } else {
-        int start = writeList((ListColumnVector) vector, row);
+        int start = writeList(writer, (ListColumnVector) vector, row);
         writer.setOffsetAndSizeFromPreviousCursor(element, start);
       }
     }
@@ -667,30 +667,27 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
     private final Converter keyConvert;
     private final Converter valueConvert;
 
-    private final UnsafeArrayWriter keyWriter;
-    private final UnsafeArrayWriter valueWriter;
-
     private final int keySize;
     private final int valueSize;
 
     private final int KEY_SIZE_BYTES = 8;
 
-    MapConverter(final UnsafeWriter parentWriter, TypeDescription schema) {
+    MapConverter(TypeDescription schema) {
       final TypeDescription keyType = schema.getChildren().get(0);
       final TypeDescription valueType = schema.getChildren().get(1);
-      keyConvert = buildConverter(parentWriter, keyType);
+      keyConvert = buildConverter(keyType);
       keySize = getArrayElementSize(keyType);
-      keyWriter = new UnsafeArrayWriter(parentWriter, keySize);
-      valueConvert = buildConverter(parentWriter, valueType);
+
+      valueConvert = buildConverter(valueType);
       valueSize = getArrayElementSize(valueType);
-      valueWriter = new UnsafeArrayWriter(parentWriter, valueSize);
     }
 
-    int writeMap(MapColumnVector v, int row) {
+    int writeMap(UnsafeWriter parentWriter, MapColumnVector v, int row) {
       final int offset = (int) v.offsets[row];
       final int length = (int) v.lengths[row];
-      final int start = keyWriter.cursor();
 
+      UnsafeArrayWriter keyWriter = new UnsafeArrayWriter(parentWriter, keySize);
+      final int start = keyWriter.cursor();
       // save room for the key size
       keyWriter.grow(KEY_SIZE_BYTES);
       keyWriter.increaseCursor(KEY_SIZE_BYTES);
@@ -705,6 +702,7 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
                 keyWriter.cursor() - start - KEY_SIZE_BYTES);
 
       // serialize the values
+      UnsafeArrayWriter valueWriter = new UnsafeArrayWriter(parentWriter, valueSize);
       valueWriter.initialize(length);
       for(int c = 0; c < length; ++c) {
         valueConvert.convert(valueWriter, c, v.values, offset + c);
@@ -720,7 +718,7 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
       if (!vector.noNulls && vector.isNull[row]) {
         writer.setNullAt(column);
       } else {
-        int start = writeMap((MapColumnVector) vector, row);
+        int start = writeMap(writer, (MapColumnVector) vector, row);
         writer.setOffsetAndSizeFromPreviousCursor(column, start);
       }
     }
@@ -733,13 +731,13 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
       if (!vector.noNulls && vector.isNull[row]) {
         writer.setNull(element);
       } else {
-        int start = writeMap((MapColumnVector) vector, row);
+        int start = writeMap(writer, (MapColumnVector) vector, row);
         writer.setOffsetAndSizeFromPreviousCursor(element, start);
       }
     }
   }
 
-  static Converter buildConverter(final UnsafeWriter writer, final TypeDescription schema) {
+  static Converter buildConverter(final TypeDescription schema) {
     switch (schema.getCategory()) {
       case BOOLEAN:
         return new BooleanConverter();
@@ -770,11 +768,11 @@ public class SparkOrcReader implements OrcValueReader<InternalRow> {
       case VARCHAR:
         return new BinaryConverter();
       case STRUCT:
-        return new StructConverter(writer, schema);
+        return new StructConverter(schema);
       case LIST:
-        return new ListConverter(writer, schema);
+        return new ListConverter(schema);
       case MAP:
-        return new MapConverter(writer, schema);
+        return new MapConverter(schema);
       default:
         throw new IllegalArgumentException("Unhandled type " + schema);
     }