You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/10 00:16:51 UTC

[GitHub] [iceberg] shardulm94 opened a new pull request #1189: Spark: Support ORC vectorized reads

shardulm94 opened a new pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189


   - Most of the new code is added in `VectorizedSparkOrcReaders`
   - Replaced `NullValuesColumnVector` with `ConstantColumnVector` which can support any constant (including nulls)
   - Moved the Iceberg to Spark constant conversion logic from `RowDataReader` into `BaseDataReader` so that it can be reused in `BatchDataReader`
   - Modified many of the Spark read test cases to test for both vectorized and non vectorized codepaths
   - Modified `TestFilteredScan` in Spark3 to use IcebergGenerics instead of Avro just like in Spark2. This enables use to test ORC reads/writes which do not have Avro GenericRecord writer.
   
   Benchmark results:
   Tests reads of 10 files with 5M records each
   ```
   Benchmark                                                                          Mode  Cnt    Score    Error  Units
   IcebergSourceFlatORCDataReadBenchmark.readFileSourceNonVectorized                    ss    5   42.789 ±  3.294   s/op
   IcebergSourceFlatORCDataReadBenchmark.readFileSourceVectorized                       ss    5   18.566 ±  1.450   s/op
   IcebergSourceFlatORCDataReadBenchmark.readIcebergNonVectorized                       ss    5   30.186 ±  1.007   s/op
   IcebergSourceFlatORCDataReadBenchmark.readIcebergVectorized                          ss    5   18.835 ±  0.818   s/op
   IcebergSourceFlatORCDataReadBenchmark.readWithProjectionFileSourceNonVectorized      ss    5    8.935 ±  0.801   s/op
   IcebergSourceFlatORCDataReadBenchmark.readWithProjectionFileSourceVectorized         ss    5    2.387 ±  0.195   s/op
   IcebergSourceFlatORCDataReadBenchmark.readWithProjectionIcebergNonVectorized         ss    5   10.691 ±  0.603   s/op
   IcebergSourceFlatORCDataReadBenchmark.readWithProjectionIcebergVectorized            ss    5    2.653 ±  0.511   s/op
   IcebergSourceNestedORCDataReadBenchmark.readFileSourceNonVectorized                  ss    5  118.318 ±  1.583   s/op
   IcebergSourceNestedORCDataReadBenchmark.readFileSourceVectorized                     ss    5   98.858 ± 12.668   s/op
   IcebergSourceNestedORCDataReadBenchmark.readIcebergNonVectorized                     ss    5   18.943 ±  1.305   s/op
   IcebergSourceNestedORCDataReadBenchmark.readIcebergVectorized                        ss    5    9.330 ±  0.938   s/op
   IcebergSourceNestedORCDataReadBenchmark.readWithProjectionFileSourceNonVectorized    ss    5   86.136 ±  1.139   s/op
   IcebergSourceNestedORCDataReadBenchmark.readWithProjectionFileSourceVectorized       ss    5   81.307 ± 14.090   s/op
   IcebergSourceNestedORCDataReadBenchmark.readWithProjectionIcebergNonVectorized       ss    5   16.671 ±  0.855   s/op
   IcebergSourceNestedORCDataReadBenchmark.readWithProjectionIcebergVectorized          ss    5    6.710 ±  0.679   s/op
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#discussion_r453126526



##########
File path: spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java
##########
@@ -42,19 +42,26 @@
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.unsafe.types.UTF8String;
 
-
-class SparkOrcValueReaders {
+public class SparkOrcValueReaders {

Review comment:
       I wanted to avoid duplicating the `if (precision < 18) then Decimal18Reader else Decimal38Reader` logic, since I am reusing it in `VectorizedSparkOrcReaders`. Similarly I made `utf8string` and `timestampTzs` methods public for reuse in the `vectorized` package. Let me know if you would like me to revert the Decimal change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#discussion_r453123702



##########
File path: spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.orc.OrcBatchReader;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.SparkOrcValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.sql.vectorized.ColumnarMap;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class VectorizedSparkOrcReaders {
+
+  private VectorizedSparkOrcReaders() {
+  }
+
+  public static OrcBatchReader<ColumnarBatch> buildReader(Schema expectedSchema, TypeDescription fileSchema,
+      Map<Integer, ?> idToConstant) {

Review comment:
       Nit: we would normally align this with the start of `Schema` on the previous line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#issuecomment-656873475


   Do we know why the nested data benchmarks show such a big improvement? Are we running correctness tests for the same cases to make sure we aren't dropping data by accident? It just seems a bit too good.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#discussion_r453126761



##########
File path: spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java
##########
@@ -42,19 +42,26 @@
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.unsafe.types.UTF8String;
 
-
-class SparkOrcValueReaders {
+public class SparkOrcValueReaders {

Review comment:
       Okay, I think that's a good justification for including this. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#discussion_r453143168



##########
File path: spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.orc.OrcBatchReader;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.SparkOrcValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.sql.vectorized.ColumnarMap;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class VectorizedSparkOrcReaders {
+
+  private VectorizedSparkOrcReaders() {
+  }
+
+  public static OrcBatchReader<ColumnarBatch> buildReader(Schema expectedSchema, TypeDescription fileSchema,
+      Map<Integer, ?> idToConstant) {
+    Converter converter = OrcSchemaWithTypeVisitor.visit(expectedSchema, fileSchema, new ReadBuilder(idToConstant));
+
+    return batch -> {
+      BaseOrcColumnVector cv = (BaseOrcColumnVector) converter.convert(new StructColumnVector(batch.size, batch.cols),
+          batch.size);
+      ColumnarBatch columnarBatch = new ColumnarBatch(IntStream.range(0, expectedSchema.columns().size())
+          .mapToObj(cv::getChild)
+          .toArray(ColumnVector[]::new));
+      columnarBatch.setNumRows(batch.size);
+      return columnarBatch;
+    };
+  }
+
+  private interface Converter {
+    ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector columnVector, int batchSize);
+  }
+
+  private static class ReadBuilder extends OrcSchemaWithTypeVisitor<Converter> {
+    private final Map<Integer, ?> idToConstant;
+
+    private ReadBuilder(Map<Integer, ?> idToConstant) {
+      this.idToConstant = idToConstant;
+    }
+
+    @Override
+    public Converter record(Types.StructType iStruct, TypeDescription record, List<String> names,
+        List<Converter> fields) {
+      return new StructConverter(iStruct, fields, idToConstant);
+    }
+
+    @Override
+    public Converter list(Types.ListType iList, TypeDescription array, Converter element) {
+      return new ArrayConverter(iList, element);
+    }
+
+    @Override
+    public Converter map(Types.MapType iMap, TypeDescription map, Converter key, Converter value) {
+      return new MapConverter(iMap, key, value);
+    }
+
+    @Override
+    public Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) {
+      final OrcValueReader<?> primitiveValueReader;
+      switch (primitive.getCategory()) {
+        case BOOLEAN:
+          primitiveValueReader = OrcValueReaders.booleans();
+          break;
+        case BYTE:
+          // Iceberg does not have a byte type. Use int
+        case SHORT:
+          // Iceberg does not have a short type. Use int
+        case DATE:
+        case INT:
+          primitiveValueReader = OrcValueReaders.ints();
+          break;
+        case LONG:
+          primitiveValueReader = OrcValueReaders.longs();
+          break;
+        case FLOAT:
+          primitiveValueReader = OrcValueReaders.floats();
+          break;
+        case DOUBLE:
+          primitiveValueReader = OrcValueReaders.doubles();
+          break;
+        case TIMESTAMP_INSTANT:
+          primitiveValueReader = SparkOrcValueReaders.timestampTzs();
+          break;
+        case DECIMAL:
+          primitiveValueReader = SparkOrcValueReaders.decimals(primitive.getPrecision(), primitive.getScale());
+          break;
+        case CHAR:
+        case VARCHAR:
+        case STRING:
+          primitiveValueReader = SparkOrcValueReaders.utf8String();
+          break;
+        case BINARY:
+          primitiveValueReader = OrcValueReaders.bytes();
+          break;
+        default:
+          throw new IllegalArgumentException("Unhandled type " + primitive);
+      }
+      return (columnVector, batchSize) ->
+          new PrimitiveOrcColumnVector(iPrimitive, batchSize, columnVector, primitiveValueReader);
+    }
+  }
+
+  private abstract static class BaseOrcColumnVector extends ColumnVector {
+    private final org.apache.orc.storage.ql.exec.vector.ColumnVector vector;
+    private final int batchSize;
+    private Integer numNulls;
+
+    BaseOrcColumnVector(Type type, int batchSize, org.apache.orc.storage.ql.exec.vector.ColumnVector vector) {
+      super(SparkSchemaUtil.convert(type));
+      this.vector = vector;
+      this.batchSize = batchSize;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public boolean hasNull() {
+      return !vector.noNulls;
+    }
+
+    @Override
+    public int numNulls() {
+      if (numNulls == null) {
+        numNulls = numNullsHelper();
+      }
+      return numNulls;
+    }
+
+    private int numNullsHelper() {
+      if (vector.isRepeating) {
+        if (vector.isNull[0]) {
+          return batchSize;
+        } else {
+          return 0;
+        }
+      } else if (vector.noNulls) {
+        return 0;
+      } else {
+        int count = 0;
+        for (int i = 0; i < batchSize; i++) {
+          if (vector.isNull[i]) {
+            count++;
+          }
+        }
+        return count;
+      }
+    }
+
+    protected int getRowIndex(int rowId) {
+      return vector.isRepeating ? 0 : rowId;
+    }
+
+    @Override
+    public boolean isNullAt(int rowId) {
+      return vector.isNull[getRowIndex(rowId)];
+    }
+
+    @Override
+    public boolean getBoolean(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public byte getByte(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public short getShort(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getInt(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getLong(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public float getFloat(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public double getDouble(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Decimal getDecimal(int rowId, int precision, int scale) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public UTF8String getUTF8String(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public byte[] getBinary(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ColumnarArray getArray(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ColumnarMap getMap(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ColumnVector getChild(int ordinal) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private static class PrimitiveOrcColumnVector extends BaseOrcColumnVector {
+    private final org.apache.orc.storage.ql.exec.vector.ColumnVector vector;
+    private final OrcValueReader<?> primitiveValueReader;
+
+    PrimitiveOrcColumnVector(Type type, int batchSize, org.apache.orc.storage.ql.exec.vector.ColumnVector vector,
+        OrcValueReader<?> primitiveValueReader) {
+      super(type, batchSize, vector);
+      this.vector = vector;
+      this.primitiveValueReader = primitiveValueReader;
+    }
+
+    @Override
+    public boolean getBoolean(int rowId) {
+      Boolean value = (Boolean) primitiveValueReader.read(vector, rowId);
+      return value != null ? value : false;
+    }
+
+    @Override
+    public int getInt(int rowId) {
+      Integer value = (Integer) primitiveValueReader.read(vector, rowId);
+      return value != null ? value : 0;

Review comment:
       Yep! That makes sense. If the reader is not going to check for null, then returning a default value would be erroneous. One issue I see is that Spark's own ColumnVector code seems to violate this at https://github.com/apache/spark/blob/d5b903e38556ee3e8e1eb8f71a08e232afa4e36a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java#L92 and similar methods within the same class.
   In Spark 2.4 these methods are not actually used, the methods are used at https://github.com/apache/spark/blob/d5b903e38556ee3e8e1eb8f71a08e232afa4e36a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java#L53 but those are not used anywhere.
   
   In Spark 3.0, they are used at https://github.com/apache/spark/blob/3b0aee3f9557ae1666b63665b08d899fe7682852/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java#L55, but this seems incorrect for the reasons you mentioned above. I am unsure when the `copy` method would actually be triggered though.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#issuecomment-656919437


   > For nested data the readFileSourceVectorized and readWithProjectionFileSourceVectorized are not really relevant since the file source defaults to row by row reading for nested data
   
   That's the answer I was looking for. It was really surprising that Spark vectorized was so slow. Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#issuecomment-656420721


   Thanks, @shardulm94! It's great to see a PR for this. I'll try to get some time to review it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#discussion_r453137096



##########
File path: orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
##########
@@ -130,4 +140,25 @@ public T next() {
     }
   }
 
+  private static class OrcBatchIterator<T> implements Iterator<T> {

Review comment:
       > This Iterator is converted to a CloseableIterator at
   
   Yes, it is converted to a `CloseableIterator`, but that iterator's close method won't do anything unless this is `Closeable`. Here's where it delegates:
   
   ```java
         @Override
         public void close() throws IOException {
           if (iterator instanceof Closeable) {
             ((Closeable) iterator).close();
           }
         }
   ```
   
   So if this `Iterator` doesn't implement `Close` to pass the close to `batchIter`, then `batchIter` won't get closed.
   
   Alternatively, you could add a `transform` factory method to `CloseableIterator`:
   
   ```java
     static <S, T> CloseableIterator<T> transform(CloseableIterator<S> iter, Function<S, T> transform) {
       return new CloseableIterator<T>() {
         @Override
         public void close() throws IOException {
           iter.close();
         }
   
         @Override
         public boolean hasNext() {
           return iter.hasNext();
         }
   
         @Override
         public T next() {
           return transform.apply(iter.next());
         }
       };
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#discussion_r453788928



##########
File path: spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.orc.OrcBatchReader;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.SparkOrcValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.sql.vectorized.ColumnarMap;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class VectorizedSparkOrcReaders {
+
+  private VectorizedSparkOrcReaders() {
+  }
+
+  public static OrcBatchReader<ColumnarBatch> buildReader(Schema expectedSchema, TypeDescription fileSchema,
+      Map<Integer, ?> idToConstant) {
+    Converter converter = OrcSchemaWithTypeVisitor.visit(expectedSchema, fileSchema, new ReadBuilder(idToConstant));
+
+    return batch -> {
+      BaseOrcColumnVector cv = (BaseOrcColumnVector) converter.convert(new StructColumnVector(batch.size, batch.cols),
+          batch.size);
+      ColumnarBatch columnarBatch = new ColumnarBatch(IntStream.range(0, expectedSchema.columns().size())
+          .mapToObj(cv::getChild)
+          .toArray(ColumnVector[]::new));
+      columnarBatch.setNumRows(batch.size);
+      return columnarBatch;
+    };
+  }
+
+  private interface Converter {
+    ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector columnVector, int batchSize);
+  }
+
+  private static class ReadBuilder extends OrcSchemaWithTypeVisitor<Converter> {
+    private final Map<Integer, ?> idToConstant;
+
+    private ReadBuilder(Map<Integer, ?> idToConstant) {
+      this.idToConstant = idToConstant;
+    }
+
+    @Override
+    public Converter record(Types.StructType iStruct, TypeDescription record, List<String> names,
+        List<Converter> fields) {
+      return new StructConverter(iStruct, fields, idToConstant);
+    }
+
+    @Override
+    public Converter list(Types.ListType iList, TypeDescription array, Converter element) {
+      return new ArrayConverter(iList, element);
+    }
+
+    @Override
+    public Converter map(Types.MapType iMap, TypeDescription map, Converter key, Converter value) {
+      return new MapConverter(iMap, key, value);
+    }
+
+    @Override
+    public Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) {
+      final OrcValueReader<?> primitiveValueReader;
+      switch (primitive.getCategory()) {
+        case BOOLEAN:
+          primitiveValueReader = OrcValueReaders.booleans();
+          break;
+        case BYTE:
+          // Iceberg does not have a byte type. Use int
+        case SHORT:
+          // Iceberg does not have a short type. Use int
+        case DATE:
+        case INT:
+          primitiveValueReader = OrcValueReaders.ints();
+          break;
+        case LONG:
+          primitiveValueReader = OrcValueReaders.longs();
+          break;
+        case FLOAT:
+          primitiveValueReader = OrcValueReaders.floats();
+          break;
+        case DOUBLE:
+          primitiveValueReader = OrcValueReaders.doubles();
+          break;
+        case TIMESTAMP_INSTANT:
+          primitiveValueReader = SparkOrcValueReaders.timestampTzs();
+          break;
+        case DECIMAL:
+          primitiveValueReader = SparkOrcValueReaders.decimals(primitive.getPrecision(), primitive.getScale());
+          break;
+        case CHAR:
+        case VARCHAR:
+        case STRING:
+          primitiveValueReader = SparkOrcValueReaders.utf8String();
+          break;
+        case BINARY:
+          primitiveValueReader = OrcValueReaders.bytes();
+          break;
+        default:
+          throw new IllegalArgumentException("Unhandled type " + primitive);
+      }
+      return (columnVector, batchSize) ->
+          new PrimitiveOrcColumnVector(iPrimitive, batchSize, columnVector, primitiveValueReader);
+    }
+  }
+
+  private abstract static class BaseOrcColumnVector extends ColumnVector {
+    private final org.apache.orc.storage.ql.exec.vector.ColumnVector vector;
+    private final int batchSize;
+    private Integer numNulls;
+
+    BaseOrcColumnVector(Type type, int batchSize, org.apache.orc.storage.ql.exec.vector.ColumnVector vector) {
+      super(SparkSchemaUtil.convert(type));
+      this.vector = vector;
+      this.batchSize = batchSize;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public boolean hasNull() {
+      return !vector.noNulls;
+    }
+
+    @Override
+    public int numNulls() {
+      if (numNulls == null) {
+        numNulls = numNullsHelper();
+      }
+      return numNulls;
+    }
+
+    private int numNullsHelper() {
+      if (vector.isRepeating) {
+        if (vector.isNull[0]) {
+          return batchSize;
+        } else {
+          return 0;
+        }
+      } else if (vector.noNulls) {
+        return 0;
+      } else {
+        int count = 0;
+        for (int i = 0; i < batchSize; i++) {
+          if (vector.isNull[i]) {
+            count++;
+          }
+        }
+        return count;
+      }
+    }
+
+    protected int getRowIndex(int rowId) {
+      return vector.isRepeating ? 0 : rowId;
+    }
+
+    @Override
+    public boolean isNullAt(int rowId) {
+      return vector.isNull[getRowIndex(rowId)];
+    }
+
+    @Override
+    public boolean getBoolean(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public byte getByte(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public short getShort(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getInt(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getLong(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public float getFloat(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public double getDouble(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Decimal getDecimal(int rowId, int precision, int scale) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public UTF8String getUTF8String(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public byte[] getBinary(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ColumnarArray getArray(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ColumnarMap getMap(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ColumnVector getChild(int ordinal) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private static class PrimitiveOrcColumnVector extends BaseOrcColumnVector {
+    private final org.apache.orc.storage.ql.exec.vector.ColumnVector vector;
+    private final OrcValueReader<?> primitiveValueReader;
+
+    PrimitiveOrcColumnVector(Type type, int batchSize, org.apache.orc.storage.ql.exec.vector.ColumnVector vector,
+        OrcValueReader<?> primitiveValueReader) {
+      super(type, batchSize, vector);
+      this.vector = vector;
+      this.primitiveValueReader = primitiveValueReader;
+    }
+
+    @Override
+    public boolean getBoolean(int rowId) {
+      Boolean value = (Boolean) primitiveValueReader.read(vector, rowId);
+      return value != null ? value : false;
+    }
+
+    @Override
+    public int getInt(int rowId) {
+      Integer value = (Integer) primitiveValueReader.read(vector, rowId);
+      return value != null ? value : 0;

Review comment:
       Looks like other implementations in Spark just return from the underlying vector, without checking. So these methods would always return something. Arrow is an exception: if null checking is enabled (the default) then it will check and throw `IllegalStateException`.
   
   I think it still makes sense to throw the `NullPointerException`, even if `copy` in Spark 3 would use it. I don't see any uses of `ArrayData.copy` in Spark 3, and any problem would only affect arrays, so the impact is limited. Plus, other sources (Arrow) break in this case and it appears to be overlooked. I think it makes sense to throw an exception to prevent a copy that corrupts the data by dropping null values.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#issuecomment-657802983


   Nice work, @shardulm94! I'm merging this because the blockers have been resolved and this is large. We can follow up with a change for the way nulls are handled in getters. I don't think that is a blocker because there are examples in Spark of both ignoring the issue and throwing exceptions.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#discussion_r453143530



##########
File path: orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
##########
@@ -130,4 +140,25 @@ public T next() {
     }
   }
 
+  private static class OrcBatchIterator<T> implements Iterator<T> {

Review comment:
       Fixed in https://github.com/apache/iceberg/pull/1189/commits/7d0cf1cafec9ff42d8e4cb5019185af00ff80d58
   
   Added `CloseableIterator.transform`, updated `VectorizedRowBatchIterator` to use `CloseableIterator`, and also fixed the issue with incorrect use of `CloseableIterator.withClose`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#issuecomment-656914438


   Which cases are you comparing for nested data? `readIcebergNonVectorized` v/s `readIcebergVectorized` for nested data shows 2-3x improvement which is similar to the improvements for flat data. For nested data the `readFileSourceVectorized` and `readWithProjectionFileSourceVectorized` are not really relevant since the file source defaults to row by row reading for nested data, so I guess we should just remove them. I modified the Spark unit tests to also test the vectorized codepaths, so I am assuming those tests check correctness, but I can do some sanity checks manually.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#discussion_r453138021



##########
File path: orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
##########
@@ -130,4 +140,25 @@ public T next() {
     }
   }
 
+  private static class OrcBatchIterator<T> implements Iterator<T> {

Review comment:
       Okay, I am able to make it work if I assign the `transformed` iterator to `Iterator` instead of `Iterator<T>`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#discussion_r453143261



##########
File path: orc/src/main/java/org/apache/iceberg/orc/ORC.java
##########
@@ -177,9 +179,20 @@ public ReadBuilder filter(Expression newFilter) {
       return this;
     }
 
+    public ReadBuilder createBatchedReaderFunc(Function<TypeDescription, OrcBatchReader<?>> batchReaderFunction) {
+      this.batchedReaderFunc = batchReaderFunction;

Review comment:
       Fixed in https://github.com/apache/iceberg/pull/1189/commits/d13c3662ac5d7417c647ad5c6c5b22685341dc46




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#discussion_r453143246



##########
File path: spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.orc.OrcBatchReader;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.SparkOrcValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.sql.vectorized.ColumnarMap;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class VectorizedSparkOrcReaders {
+
+  private VectorizedSparkOrcReaders() {
+  }
+
+  public static OrcBatchReader<ColumnarBatch> buildReader(Schema expectedSchema, TypeDescription fileSchema,
+      Map<Integer, ?> idToConstant) {

Review comment:
       Fixed in https://github.com/apache/iceberg/pull/1189/commits/faeffa368d4bc7766ea2008b351acddd792ad7ef




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#discussion_r453143313



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
##########
@@ -75,6 +103,16 @@
       }
 
       iter = builder.build();
+    } else if (task.file().format() == FileFormat.ORC) {
+      iter = ORC.read(location)
+          .project(expectedSchema)

Review comment:
       Fixed in https://github.com/apache/iceberg/pull/1189/commits/9c2af8d5038b8dd478c39f57729370f7244816a7




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#discussion_r453120088



##########
File path: orc/src/main/java/org/apache/iceberg/orc/ORC.java
##########
@@ -177,9 +179,20 @@ public ReadBuilder filter(Expression newFilter) {
       return this;
     }
 
+    public ReadBuilder createBatchedReaderFunc(Function<TypeDescription, OrcBatchReader<?>> batchReaderFunction) {
+      this.batchedReaderFunc = batchReaderFunction;

Review comment:
       This should validate that `createReaderFunc` hasn't also been called. And that function should validate that this one hasn't been called. That way there is no ambiguous behavior and the user gets an error when they try to set up both batch and row readers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#discussion_r453137191



##########
File path: orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
##########
@@ -130,4 +140,25 @@ public T next() {
     }
   }
 
+  private static class OrcBatchIterator<T> implements Iterator<T> {

Review comment:
       I remember trying this too. `OrcBatchReader` is a functional interface, so it should not have issues converting the reader to a Function. The issue seems to be the capture type.
   
   ![image](https://user-images.githubusercontent.com/6961317/87213393-7e3f7080-c2d9-11ea-9797-d7c717867391.png)
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] megshao commented on a change in pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
megshao commented on a change in pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#discussion_r710835889



##########
File path: orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
##########
@@ -130,4 +140,25 @@ public T next() {
     }
   }
 
+  private static class OrcBatchIterator<T> implements Iterator<T> {

Review comment:
       PaymentFlowInfoObject 那邊~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#discussion_r453124496



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
##########
@@ -75,6 +103,16 @@
       }
 
       iter = builder.build();
+    } else if (task.file().format() == FileFormat.ORC) {
+      iter = ORC.read(location)
+          .project(expectedSchema)

Review comment:
       I will change this to not project constant columns to avoid materializing a `ColumnVector` similar to what we did in #1191




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#discussion_r453136480



##########
File path: orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
##########
@@ -130,4 +140,25 @@ public T next() {
     }
   }
 
+  private static class OrcBatchIterator<T> implements Iterator<T> {

Review comment:
       What about `Iterators.transform(rowBatchIterator, batchReaderFunction.apply(readOrcSchema)::read)`? That should allow the compiler to convert it to a Function.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#discussion_r453122361



##########
File path: orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
##########
@@ -130,4 +140,25 @@ public T next() {
     }
   }
 
+  private static class OrcBatchIterator<T> implements Iterator<T> {

Review comment:
       This needs to be a `CloseableIterator` to avoid leaking an open file. I'm also wondering if we could replace this implementation with a method like `CloseableIterable.transform` for `CloseableIterator`. All this is doing is calling a method on the result of another `Iterator`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#discussion_r453125375



##########
File path: spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.data.vectorized;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.orc.OrcBatchReader;
+import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
+import org.apache.iceberg.orc.OrcValueReader;
+import org.apache.iceberg.orc.OrcValueReaders;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.SparkOrcValueReaders;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
+import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
+import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.sql.vectorized.ColumnarMap;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class VectorizedSparkOrcReaders {
+
+  private VectorizedSparkOrcReaders() {
+  }
+
+  public static OrcBatchReader<ColumnarBatch> buildReader(Schema expectedSchema, TypeDescription fileSchema,
+      Map<Integer, ?> idToConstant) {
+    Converter converter = OrcSchemaWithTypeVisitor.visit(expectedSchema, fileSchema, new ReadBuilder(idToConstant));
+
+    return batch -> {
+      BaseOrcColumnVector cv = (BaseOrcColumnVector) converter.convert(new StructColumnVector(batch.size, batch.cols),
+          batch.size);
+      ColumnarBatch columnarBatch = new ColumnarBatch(IntStream.range(0, expectedSchema.columns().size())
+          .mapToObj(cv::getChild)
+          .toArray(ColumnVector[]::new));
+      columnarBatch.setNumRows(batch.size);
+      return columnarBatch;
+    };
+  }
+
+  private interface Converter {
+    ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector columnVector, int batchSize);
+  }
+
+  private static class ReadBuilder extends OrcSchemaWithTypeVisitor<Converter> {
+    private final Map<Integer, ?> idToConstant;
+
+    private ReadBuilder(Map<Integer, ?> idToConstant) {
+      this.idToConstant = idToConstant;
+    }
+
+    @Override
+    public Converter record(Types.StructType iStruct, TypeDescription record, List<String> names,
+        List<Converter> fields) {
+      return new StructConverter(iStruct, fields, idToConstant);
+    }
+
+    @Override
+    public Converter list(Types.ListType iList, TypeDescription array, Converter element) {
+      return new ArrayConverter(iList, element);
+    }
+
+    @Override
+    public Converter map(Types.MapType iMap, TypeDescription map, Converter key, Converter value) {
+      return new MapConverter(iMap, key, value);
+    }
+
+    @Override
+    public Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription primitive) {
+      final OrcValueReader<?> primitiveValueReader;
+      switch (primitive.getCategory()) {
+        case BOOLEAN:
+          primitiveValueReader = OrcValueReaders.booleans();
+          break;
+        case BYTE:
+          // Iceberg does not have a byte type. Use int
+        case SHORT:
+          // Iceberg does not have a short type. Use int
+        case DATE:
+        case INT:
+          primitiveValueReader = OrcValueReaders.ints();
+          break;
+        case LONG:
+          primitiveValueReader = OrcValueReaders.longs();
+          break;
+        case FLOAT:
+          primitiveValueReader = OrcValueReaders.floats();
+          break;
+        case DOUBLE:
+          primitiveValueReader = OrcValueReaders.doubles();
+          break;
+        case TIMESTAMP_INSTANT:
+          primitiveValueReader = SparkOrcValueReaders.timestampTzs();
+          break;
+        case DECIMAL:
+          primitiveValueReader = SparkOrcValueReaders.decimals(primitive.getPrecision(), primitive.getScale());
+          break;
+        case CHAR:
+        case VARCHAR:
+        case STRING:
+          primitiveValueReader = SparkOrcValueReaders.utf8String();
+          break;
+        case BINARY:
+          primitiveValueReader = OrcValueReaders.bytes();
+          break;
+        default:
+          throw new IllegalArgumentException("Unhandled type " + primitive);
+      }
+      return (columnVector, batchSize) ->
+          new PrimitiveOrcColumnVector(iPrimitive, batchSize, columnVector, primitiveValueReader);
+    }
+  }
+
+  private abstract static class BaseOrcColumnVector extends ColumnVector {
+    private final org.apache.orc.storage.ql.exec.vector.ColumnVector vector;
+    private final int batchSize;
+    private Integer numNulls;
+
+    BaseOrcColumnVector(Type type, int batchSize, org.apache.orc.storage.ql.exec.vector.ColumnVector vector) {
+      super(SparkSchemaUtil.convert(type));
+      this.vector = vector;
+      this.batchSize = batchSize;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public boolean hasNull() {
+      return !vector.noNulls;
+    }
+
+    @Override
+    public int numNulls() {
+      if (numNulls == null) {
+        numNulls = numNullsHelper();
+      }
+      return numNulls;
+    }
+
+    private int numNullsHelper() {
+      if (vector.isRepeating) {
+        if (vector.isNull[0]) {
+          return batchSize;
+        } else {
+          return 0;
+        }
+      } else if (vector.noNulls) {
+        return 0;
+      } else {
+        int count = 0;
+        for (int i = 0; i < batchSize; i++) {
+          if (vector.isNull[i]) {
+            count++;
+          }
+        }
+        return count;
+      }
+    }
+
+    protected int getRowIndex(int rowId) {
+      return vector.isRepeating ? 0 : rowId;
+    }
+
+    @Override
+    public boolean isNullAt(int rowId) {
+      return vector.isNull[getRowIndex(rowId)];
+    }
+
+    @Override
+    public boolean getBoolean(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public byte getByte(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public short getShort(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getInt(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getLong(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public float getFloat(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public double getDouble(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Decimal getDecimal(int rowId, int precision, int scale) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public UTF8String getUTF8String(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public byte[] getBinary(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ColumnarArray getArray(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ColumnarMap getMap(int rowId) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ColumnVector getChild(int ordinal) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private static class PrimitiveOrcColumnVector extends BaseOrcColumnVector {
+    private final org.apache.orc.storage.ql.exec.vector.ColumnVector vector;
+    private final OrcValueReader<?> primitiveValueReader;
+
+    PrimitiveOrcColumnVector(Type type, int batchSize, org.apache.orc.storage.ql.exec.vector.ColumnVector vector,
+        OrcValueReader<?> primitiveValueReader) {
+      super(type, batchSize, vector);
+      this.vector = vector;
+      this.primitiveValueReader = primitiveValueReader;
+    }
+
+    @Override
+    public boolean getBoolean(int rowId) {
+      Boolean value = (Boolean) primitiveValueReader.read(vector, rowId);
+      return value != null ? value : false;
+    }
+
+    @Override
+    public int getInt(int rowId) {
+      Integer value = (Integer) primitiveValueReader.read(vector, rowId);
+      return value != null ? value : 0;

Review comment:
       Why this extra check just to add a default value? If it is okay to return a default value, then `getInt` should never be called for a `rowId` where the value is null. And if this is never called when the value is null, then I'd rather directly cast. That way, a `NullPointerException` is thrown if the method contract is violated instead of silently returning the wrong value.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#discussion_r453120871



##########
File path: spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java
##########
@@ -42,19 +42,26 @@
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.unsafe.types.UTF8String;
 
-
-class SparkOrcValueReaders {
+public class SparkOrcValueReaders {

Review comment:
       I like the changes here and in `SparkOrcReader`, but they aren't really needed for this PR and increase the number of files that are touched. In general, I prefer keeping changes like this separate to avoid unnecessary conflicts, and so that we can revert commits without removing unrelated clean-up.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#issuecomment-656944576


   The only blocker I see is that the batch iterator doesn't pass the close call down to the underlying ORC reader. Otherwise, everything is minor.
   
   Awesome work, @shardulm94!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] shardulm94 commented on a change in pull request #1189: Spark: Support ORC vectorized reads

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #1189:
URL: https://github.com/apache/iceberg/pull/1189#discussion_r453134121



##########
File path: orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java
##########
@@ -130,4 +140,25 @@ public T next() {
     }
   }
 
+  private static class OrcBatchIterator<T> implements Iterator<T> {

Review comment:
       This `Iterator` is converted to a `CloseableIterator` at https://github.com/apache/iceberg/blob/6a5e8c53d3950922b8f6bd753329b7d943ac423d/orc/src/main/java/org/apache/iceberg/orc/OrcIterable.java#L89
   I too want to avoid creating the `OrcBatchIterator` class, however I am unable to make `Iterators.transform` work here because of the capture type of `batchReaderFunction`.
   
   ![image](https://user-images.githubusercontent.com/6961317/87212841-92816e80-c2d5-11ea-9fb8-77f16f12206f.png)
   
   I am also not able to cast the `batchReaderFunction` to the generic type used by `OrcIterable`.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org