You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/03/09 15:04:05 UTC

[GitHub] [iceberg] rymurr commented on a change in pull request #2286: Add Arrow vectorized reader

rymurr commented on a change in pull request #2286:
URL: https://github.com/apache/iceberg/pull/2286#discussion_r585433534



##########
File path: data/src/main/java/org/apache/iceberg/data/IcebergGenerics.java
##########
@@ -47,6 +47,10 @@ public ScanBuilder(Table table) {
       this.tableScan = table.newScan();
     }
 
+    public TableScan getTableScan() {

Review comment:
       This seems odd to me at first glance. What is the motivation for this change? Apologies if it becomes obvious later, i am early in my review

##########
File path: data/src/main/java/org/apache/iceberg/data/arrow/ArrowFileScanTaskReader.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.arrow;
+
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * Reads the data file and returns an iterator of {@link VectorSchemaRoot}. Only Parquet data file format is supported.
+ */
+public class ArrowFileScanTaskReader extends BaseArrowFileScanTaskReader<VectorSchemaRoot> {
+
+  private final Schema expectedSchema;
+  private final String nameMapping;
+  private final boolean caseSensitive;
+  private final int batchSize;
+  private final boolean reuseContainers;
+
+  /**
+   * Create a new instance.
+   *
+   * @param task              Combined file scan task.
+   * @param expectedSchema    Read schema. The returned data will have this schema.
+   * @param nameMapping       Mapping from external schema names to Iceberg type IDs.
+   * @param fileIo            File I/O.
+   * @param encryptionManager Encryption manager.
+   * @param caseSensitive     Indicates whether column names are case sensitive.
+   * @param batchSize         Batch size in number of rows. Each Arrow batch ({@link VectorSchemaRoot}) contains a
+   *                          maximum of {@code batchSize} rows.
+   * @param reuseContainers   Reuse Arrow vectors from the previous {@link VectorSchemaRoot} in the next {@link

Review comment:
       when should this boolean be flipped and what are the consequences of doing so? Maybe just a bit more documentation.

##########
File path: data/src/main/java/org/apache/iceberg/data/arrow/BaseArrowFileScanTaskReader.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.arrow;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedInputFile;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for vectorized Arrow reader.
+ */
+abstract class BaseArrowFileScanTaskReader<T> implements CloseableIterator<T> {

Review comment:
       I think this should be called `VectorizedTableScanIterable` similar to `TableScanIterable` but for vectorized reads. I am not sure it should be parameterised. As discussed before I would prefer to expose a simpler interface than `VectorSchemaRoot`. Rather similar to `ColumnBatch` but not using the actual spark code. The idea being it can be a non-arrow specific wrapper around `VectorSchemaRoot` that deals with lifetime etc for 'regular' users and exposes a getter for `VectorSchemaRoot` for expert users.

##########
File path: build.gradle
##########
@@ -239,6 +239,8 @@ project(':iceberg-data') {
     compile project(':iceberg-core')
     compileOnly project(':iceberg-parquet')
     compileOnly project(':iceberg-orc')
+    compileOnly project(':iceberg-arrow')

Review comment:
       I am not sure how this works as compile only? This means that it won't be added to downstream  modules (eg spark) where there may be different arrow versions but that also means no one can use it unless they also explicitly pull in all the arrow deps.
   
   Perhaps everything can go into the arrow module? If required here we can instantiate by reflection if the arrow module is present on teh classpath?

##########
File path: build.gradle
##########
@@ -250,13 +252,25 @@ project(':iceberg-data') {
       exclude group: 'org.slf4j', module: 'slf4j-log4j12'
     }
 
+    compileOnly("org.apache.arrow:arrow-vector") {

Review comment:
       What is this needed for? I think we should keep everything `arrow` in the arrow module.

##########
File path: data/src/main/java/org/apache/iceberg/data/arrow/VectorSchemaRootReader.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.arrow;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.iceberg.arrow.vectorized.VectorHolder;
+import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
+import org.apache.iceberg.parquet.VectorizedReader;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+/**
+ * A collection of vectorized readers per column (in the expected read schema) and Arrow Vector holders. This class owns
+ * the Arrow vectors and is responsible for closing the Arrow vectors.
+ */
+public class VectorSchemaRootReader implements VectorizedReader<VectorSchemaRoot> {

Review comment:
       Rather than reimplementing this perhaps it can use the existing `VectorizedArrowReader`?

##########
File path: data/src/main/java/org/apache/iceberg/data/arrow/ArrowFileScanTaskReader.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.arrow;
+
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * Reads the data file and returns an iterator of {@link VectorSchemaRoot}. Only Parquet data file format is supported.
+ */
+public class ArrowFileScanTaskReader extends BaseArrowFileScanTaskReader<VectorSchemaRoot> {

Review comment:
       I think this can be folded into the base class

##########
File path: data/src/main/java/org/apache/iceberg/data/arrow/VectorizedParquetReaders.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.arrow;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.arrow.ArrowAllocation;
+import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.parquet.VectorizedReader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+/**
+ * Builds an {@link VectorSchemaRootReader}.
+ */
+public class VectorizedParquetReaders {

Review comment:
       What is the difference between this and the spark one? Can we just merge them? I would think that this could go in teh `arrow` module and be called from spark?

##########
File path: data/src/main/java/org/apache/iceberg/data/arrow/ArrowReader.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data.arrow;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Vectorized reader that returns an iterator of {@link VectorSchemaRoot}. See {@link #iterator()} to learn about the
+ * behavior of the iterator.
+ *
+ * <p>The following Iceberg data types are supported and have been tested:
+ * <ul>
+ *     <li>Iceberg: {@link Types.BooleanType}, Arrow: {@link MinorType#BIT}</li>
+ *     <li>Iceberg: {@link Types.IntegerType}, Arrow: {@link MinorType#INT}</li>
+ *     <li>Iceberg: {@link Types.LongType}, Arrow: {@link MinorType#BIGINT}</li>
+ *     <li>Iceberg: {@link Types.FloatType}, Arrow: {@link MinorType#FLOAT4}</li>
+ *     <li>Iceberg: {@link Types.DoubleType}, Arrow: {@link MinorType#FLOAT8}</li>
+ *     <li>Iceberg: {@link Types.StringType}, Arrow: {@link MinorType#VARCHAR}</li>
+ *     <li>Iceberg: {@link Types.TimestampType} (both with and without timezone),
+ *         Arrow: {@link MinorType#TIMEMICRO}</li>
+ *     <li>Iceberg: {@link Types.BinaryType}, Arrow: {@link MinorType#VARBINARY}</li>
+ *     <li>Iceberg: {@link Types.DateType}, Arrow: {@link MinorType#DATEDAY}</li>
+ * </ul>
+ *
+ * <p>Features that don't work in this implementation:
+ * <ul>
+ *     <li>Type promotion: In case of type promotion, the Arrow vector corresponding to
+ *     the data type in the parquet file is returned instead of the data type in the latest schema.</li>
+ *     <li>Columns with constant values are physically encoded as a dictionary. The Arrow vector
+ *     type is int32 instead of the type as per the schema.</li>
+ *     <li>Data types: {@link Types.TimeType}, {@link Types.ListType}, {@link Types.MapType},
+ *     {@link Types.StructType}</li>
+ * </ul>
+ *
+ * <p>Data types not tested: {@link Types.UUIDType}, {@link Types.FixedType}, {@link Types.DecimalType}.
+ */
+public class ArrowReader extends CloseableGroup implements CloseableIterable<VectorSchemaRoot> {

Review comment:
       Can this follow the same pattern as `GenericReader`? Does it need to be iterable or can it serve iterables




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org