You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/09/17 18:36:55 UTC
[iceberg] branch master updated: Arrow: Fix assertions and
correctness checks (#2933)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f220f25 Arrow: Fix assertions and correctness checks (#2933)
f220f25 is described below
commit f220f2534f3ae6e77c4ab9cae2ebe02185655d6a
Author: mayursrivastava <ma...@twosigma.com>
AuthorDate: Fri Sep 17 14:36:45 2021 -0400
Arrow: Fix assertions and correctness checks (#2933)
---
.../iceberg/arrow/vectorized/ArrowReader.java | 68 +++++++++++++++++-----
.../iceberg/arrow/vectorized/ColumnVector.java | 2 +
2 files changed, 56 insertions(+), 14 deletions(-)
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
index 2386ce5..f09774e 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
@@ -25,6 +25,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
@@ -49,9 +50,16 @@ import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Type.TypeID;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ExceptionUtil;
+import org.apache.iceberg.util.TableScanUtil;
import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Vectorized reader that returns an iterator of {@link ColumnarBatch}.
@@ -70,6 +78,8 @@ import org.apache.parquet.schema.MessageType;
* Arrow: {@link MinorType#TIMEMICRO}</li>
* <li>Iceberg: {@link Types.BinaryType}, Arrow: {@link MinorType#VARBINARY}</li>
* <li>Iceberg: {@link Types.DateType}, Arrow: {@link MinorType#DATEDAY}</li>
+ * <li>Iceberg: {@link Types.TimeType}, Arrow: {@link MinorType#TIMEMICRO}</li>
+ * <li>Iceberg: {@link Types.UUIDType}, Arrow: {@link MinorType#FIXEDSIZEBINARY}(16)</li>
* </ul>
*
* <p>Features that don't work in this implementation:
@@ -84,11 +94,26 @@ import org.apache.parquet.schema.MessageType;
* {@link Types.StructType}, {@link Types.FixedType} and
* {@link Types.DecimalType}
* See https://github.com/apache/iceberg/issues/2485 and https://github.com/apache/iceberg/issues/2486.</li>
- * <li>Iceberg v2 spec is not supported.
+ * <li>Delete files are not supported.
* See https://github.com/apache/iceberg/issues/2487.</li>
* </ul>
*/
public class ArrowReader extends CloseableGroup {
+ private static final Logger LOG = LoggerFactory.getLogger(ArrowReader.class);
+
+ private static final Set<TypeID> SUPPORTED_TYPES = ImmutableSet.of(
+ TypeID.BOOLEAN,
+ TypeID.INTEGER,
+ TypeID.LONG,
+ TypeID.FLOAT,
+ TypeID.DOUBLE,
+ TypeID.STRING,
+ TypeID.TIMESTAMP,
+ TypeID.BINARY,
+ TypeID.DATE,
+ TypeID.UUID,
+ TypeID.TIME
+ );
private final Schema schema;
private final FileIO io;
@@ -136,6 +161,14 @@ public class ArrowReader extends CloseableGroup {
* previous {@link ColumnarBatch} may be reused for the next {@link ColumnarBatch}.
* This implies that the caller should either use the {@link ColumnarBatch} or deep copy the
* {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+ * <p>
+ * This method works for only when the following conditions are true:
+ * <ol>
+ * <li>At least one column is queried,</li>
+ * <li>There are no delete files, and</li>
+ * <li>Supported data types are queried (see {@link #SUPPORTED_TYPES}).</li>
+ * </ol>
+ * When any of these conditions fail, an {@link UnsupportedOperationException} is thrown.
*/
public CloseableIterator<ColumnarBatch> open(CloseableIterable<CombinedScanTask> tasks) {
CloseableIterator<ColumnarBatch> itr = new VectorizedCombinedScanIterator(
@@ -163,7 +196,6 @@ public class ArrowReader extends CloseableGroup {
*/
private static final class VectorizedCombinedScanIterator implements CloseableIterator<ColumnarBatch> {
- private final List<FileScanTask> fileTasks;
private final Iterator<FileScanTask> fileItr;
private final Map<String, InputFile> inputFiles;
private final Schema expectedSchema;
@@ -206,15 +238,30 @@ public class ArrowReader extends CloseableGroup {
boolean caseSensitive,
int batchSize,
boolean reuseContainers) {
- this.fileTasks = StreamSupport.stream(tasks.spliterator(), false)
+ List<FileScanTask> fileTasks = StreamSupport.stream(tasks.spliterator(), false)
.map(CombinedScanTask::files)
.flatMap(Collection::stream)
.collect(Collectors.toList());
this.fileItr = fileTasks.iterator();
+ if (fileTasks.stream().anyMatch(TableScanUtil::hasDeletes)) {
+ throw new UnsupportedOperationException("Cannot read files that require applying delete files");
+ }
+
+ if (expectedSchema.columns().isEmpty()) {
+ throw new UnsupportedOperationException("Cannot read without at least one projected column");
+ }
+
+ Set<TypeID> unsupportedTypes = Sets.difference(
+ expectedSchema.columns().stream().map(c -> c.type().typeId()).collect(Collectors.toSet()),
+ SUPPORTED_TYPES);
+ if (!unsupportedTypes.isEmpty()) {
+ throw new UnsupportedOperationException("Cannot read unsupported column types: " + unsupportedTypes);
+ }
+
Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
fileTasks.stream()
- .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
+ .map(FileScanTask::file)
.forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata()));
Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
@@ -252,17 +299,10 @@ public class ArrowReader extends CloseableGroup {
}
} catch (IOException | RuntimeException e) {
if (currentTask != null && !currentTask.isDataTask()) {
- throw new RuntimeException(
- "Error reading file: " + getInputFile(currentTask).location() +
- ". Reason: the current task is not a data task, i.e. it cannot read data rows. " +
- "Ensure that the tasks passed to the constructor are data tasks. " +
- "The file scan tasks are: " + fileTasks,
- e);
- } else {
- throw new RuntimeException(
- "An error occurred while iterating through the file scan tasks or closing the iterator," +
- " see the stacktrace for further information. The file scan tasks are: " + fileTasks, e);
+ LOG.error("Error reading file: {}", getInputFile(currentTask).location(), e);
}
+ ExceptionUtil.castAndThrow(e, RuntimeException.class);
+ return false;
}
}
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnVector.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnVector.java
index e395926..78c6f4d 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnVector.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnVector.java
@@ -40,6 +40,8 @@ import org.apache.iceberg.types.Types;
* <li>{@link Types.BinaryType}</li>
* <li>{@link Types.TimestampType} (with and without timezone)</li>
* <li>{@link Types.DateType}</li>
+ * <li>{@link Types.TimeType}</li>
+ * <li>{@link Types.UUIDType}</li>
* </ul>
*/
public class ColumnVector implements AutoCloseable {