You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/09/17 18:36:55 UTC

[iceberg] branch master updated: Arrow: Fix assertions and correctness checks (#2933)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f220f25  Arrow: Fix assertions and correctness checks (#2933)
f220f25 is described below

commit f220f2534f3ae6e77c4ab9cae2ebe02185655d6a
Author: mayursrivastava <ma...@twosigma.com>
AuthorDate: Fri Sep 17 14:36:45 2021 -0400

    Arrow: Fix assertions and correctness checks (#2933)
---
 .../iceberg/arrow/vectorized/ArrowReader.java      | 68 +++++++++++++++++-----
 .../iceberg/arrow/vectorized/ColumnVector.java     |  2 +
 2 files changed, 56 insertions(+), 14 deletions(-)

diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
index 2386ce5..f09774e 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
@@ -25,6 +25,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
@@ -49,9 +50,16 @@ import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type.TypeID;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ExceptionUtil;
+import org.apache.iceberg.util.TableScanUtil;
 import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Vectorized reader that returns an iterator of {@link ColumnarBatch}.
@@ -70,6 +78,8 @@ import org.apache.parquet.schema.MessageType;
  *         Arrow: {@link MinorType#TIMEMICRO}</li>
  *     <li>Iceberg: {@link Types.BinaryType}, Arrow: {@link MinorType#VARBINARY}</li>
  *     <li>Iceberg: {@link Types.DateType}, Arrow: {@link MinorType#DATEDAY}</li>
+ *     <li>Iceberg: {@link Types.TimeType}, Arrow: {@link MinorType#TIMEMICRO}</li>
+ *     <li>Iceberg: {@link Types.UUIDType}, Arrow: {@link MinorType#FIXEDSIZEBINARY}(16)</li>
  * </ul>
  *
  * <p>Features that don't work in this implementation:
@@ -84,11 +94,26 @@ import org.apache.parquet.schema.MessageType;
  *     {@link Types.StructType}, {@link Types.FixedType} and
  *     {@link Types.DecimalType}
  *     See https://github.com/apache/iceberg/issues/2485 and https://github.com/apache/iceberg/issues/2486.</li>
- *     <li>Iceberg v2 spec is not supported.
+ *     <li>Delete files are not supported.
  *     See https://github.com/apache/iceberg/issues/2487.</li>
  * </ul>
  */
 public class ArrowReader extends CloseableGroup {
+  private static final Logger LOG = LoggerFactory.getLogger(ArrowReader.class);
+
+  private static final Set<TypeID> SUPPORTED_TYPES = ImmutableSet.of(
+      TypeID.BOOLEAN,
+      TypeID.INTEGER,
+      TypeID.LONG,
+      TypeID.FLOAT,
+      TypeID.DOUBLE,
+      TypeID.STRING,
+      TypeID.TIMESTAMP,
+      TypeID.BINARY,
+      TypeID.DATE,
+      TypeID.UUID,
+      TypeID.TIME
+  );
 
   private final Schema schema;
   private final FileIO io;
@@ -136,6 +161,14 @@ public class ArrowReader extends CloseableGroup {
    * previous {@link ColumnarBatch} may be reused for the next {@link ColumnarBatch}.
    * This implies that the caller should either use the {@link ColumnarBatch} or deep copy the
    * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+   * <p>
+   * This method works for only when the following conditions are true:
+   * <ol>
+   *     <li>At least one column is queried,</li>
+   *     <li>There are no delete files, and</li>
+   *     <li>Supported data types are queried (see {@link #SUPPORTED_TYPES}).</li>
+   * </ol>
+   * When any of these conditions fail, an {@link UnsupportedOperationException} is thrown.
    */
   public CloseableIterator<ColumnarBatch> open(CloseableIterable<CombinedScanTask> tasks) {
     CloseableIterator<ColumnarBatch> itr = new VectorizedCombinedScanIterator(
@@ -163,7 +196,6 @@ public class ArrowReader extends CloseableGroup {
    */
   private static final class VectorizedCombinedScanIterator implements CloseableIterator<ColumnarBatch> {
 
-    private final List<FileScanTask> fileTasks;
     private final Iterator<FileScanTask> fileItr;
     private final Map<String, InputFile> inputFiles;
     private final Schema expectedSchema;
@@ -206,15 +238,30 @@ public class ArrowReader extends CloseableGroup {
         boolean caseSensitive,
         int batchSize,
         boolean reuseContainers) {
-      this.fileTasks = StreamSupport.stream(tasks.spliterator(), false)
+      List<FileScanTask> fileTasks = StreamSupport.stream(tasks.spliterator(), false)
           .map(CombinedScanTask::files)
           .flatMap(Collection::stream)
           .collect(Collectors.toList());
       this.fileItr = fileTasks.iterator();
 
+      if (fileTasks.stream().anyMatch(TableScanUtil::hasDeletes)) {
+        throw new UnsupportedOperationException("Cannot read files that require applying delete files");
+      }
+
+      if (expectedSchema.columns().isEmpty()) {
+        throw new UnsupportedOperationException("Cannot read without at least one projected column");
+      }
+
+      Set<TypeID> unsupportedTypes = Sets.difference(
+          expectedSchema.columns().stream().map(c -> c.type().typeId()).collect(Collectors.toSet()),
+          SUPPORTED_TYPES);
+      if (!unsupportedTypes.isEmpty()) {
+        throw new UnsupportedOperationException("Cannot read unsupported column types: " + unsupportedTypes);
+      }
+
       Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
       fileTasks.stream()
-          .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
+          .map(FileScanTask::file)
           .forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata()));
 
       Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
@@ -252,17 +299,10 @@ public class ArrowReader extends CloseableGroup {
         }
       } catch (IOException | RuntimeException e) {
         if (currentTask != null && !currentTask.isDataTask()) {
-          throw new RuntimeException(
-              "Error reading file: " + getInputFile(currentTask).location() +
-                  ". Reason: the current task is not a data task, i.e. it cannot read data rows. " +
-                  "Ensure that the tasks passed to the constructor are data tasks. " +
-                  "The file scan tasks are: " + fileTasks,
-              e);
-        } else {
-          throw new RuntimeException(
-              "An error occurred while iterating through the file scan tasks or closing the iterator," +
-                  " see the stacktrace for further information. The file scan tasks are: " + fileTasks, e);
+          LOG.error("Error reading file: {}", getInputFile(currentTask).location(), e);
         }
+        ExceptionUtil.castAndThrow(e, RuntimeException.class);
+        return false;
       }
     }
 
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnVector.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnVector.java
index e395926..78c6f4d 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnVector.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnVector.java
@@ -40,6 +40,8 @@ import org.apache.iceberg.types.Types;
  *     <li>{@link Types.BinaryType}</li>
  *     <li>{@link Types.TimestampType} (with and without timezone)</li>
  *     <li>{@link Types.DateType}</li>
+ *     <li>{@link Types.TimeType}</li>
+ *     <li>{@link Types.UUIDType}</li>
  *   </ul>
  */
 public class ColumnVector implements AutoCloseable {