You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/03/21 02:11:58 UTC
[incubator-iceberg] branch master updated: Spark: Extract base data
reader for vectorized reads (#853)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 1bc6455 Spark: Extract base data reader for vectorized reads (#853)
1bc6455 is described below
commit 1bc64553aa07d9a3bade5928e4fa4f5bc8bb8c4e
Author: Samarth Jain <sa...@apache.org>
AuthorDate: Fri Mar 20 19:11:48 2020 -0700
Spark: Extract base data reader for vectorized reads (#853)
---
.../iceberg/spark/source/BaseDataReader.java | 110 +++++++
.../spark/source/PartitionRowConverter.java | 107 +++++++
.../org/apache/iceberg/spark/source/Reader.java | 322 +--------------------
.../apache/iceberg/spark/source/RowDataReader.java | 224 ++++++++++++++
4 files changed, 442 insertions(+), 321 deletions(-)
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
new file mode 100644
index 0000000..1e3e07f
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.arrow.util.Preconditions;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+
+/**
+ * Base class of readers of type {@link InputPartitionReader} to read data as objects of type @param <T>
+ *
+ * @param <T> is the Java class returned by this reader whose objects contain one or more rows.
+ */
+@SuppressWarnings("checkstyle:VisibilityModifier")
+abstract class BaseDataReader<T> implements InputPartitionReader<T> {
+ private final Iterator<FileScanTask> tasks;
+ private final FileIO fileIo;
+ private final Map<String, InputFile> inputFiles;
+
+ private Iterator<T> currentIterator;
+ Closeable currentCloseable;
+ private T current = null;
+
+ BaseDataReader(CombinedScanTask task, FileIO fileIo, EncryptionManager encryptionManager) {
+ this.fileIo = fileIo;
+ this.tasks = task.files().iterator();
+ Iterable<InputFile> decryptedFiles = encryptionManager.decrypt(Iterables.transform(
+ task.files(),
+ fileScanTask ->
+ EncryptedFiles.encryptedInput(
+ this.fileIo.newInputFile(fileScanTask.file().path().toString()),
+ fileScanTask.file().keyMetadata())));
+ ImmutableMap.Builder<String, InputFile> inputFileBuilder = ImmutableMap.builder();
+ decryptedFiles.forEach(decrypted -> inputFileBuilder.put(decrypted.location(), decrypted));
+ this.inputFiles = inputFileBuilder.build();
+ this.currentCloseable = CloseableIterable.empty();
+ this.currentIterator = Collections.emptyIterator();
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ while (true) {
+ if (currentIterator.hasNext()) {
+ this.current = currentIterator.next();
+ return true;
+ } else if (tasks.hasNext()) {
+ this.currentCloseable.close();
+ this.currentIterator = open(tasks.next());
+ } else {
+ return false;
+ }
+ }
+ }
+
+ @Override
+ public T get() {
+ return current;
+ }
+
+ abstract Iterator<T> open(FileScanTask task);
+
+ @Override
+ public void close() throws IOException {
+ InputFileBlockHolder.unset();
+
+ // close the current iterator
+ this.currentCloseable.close();
+
+ // exhaust the task iterator
+ while (tasks.hasNext()) {
+ tasks.next();
+ }
+ }
+
+ InputFile getInputFile(FileScanTask task) {
+ Preconditions.checkArgument(!task.isDataTask(), "Invalid task type");
+ return inputFiles.get(task.file().path().toString());
+ }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/PartitionRowConverter.java b/spark/src/main/java/org/apache/iceberg/spark/source/PartitionRowConverter.java
new file mode 100644
index 0000000..13996f9
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/PartitionRowConverter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Objects of this class generate an {@link InternalRow} by utilizing the partition schema passed during construction.
+ */
+class PartitionRowConverter implements Function<StructLike, InternalRow> {
+ private final DataType[] types;
+ private final int[] positions;
+ private final Class<?>[] javaTypes;
+ private final GenericInternalRow reusedRow;
+
+ PartitionRowConverter(Schema partitionSchema, PartitionSpec spec) {
+ StructType partitionType = SparkSchemaUtil.convert(partitionSchema);
+ StructField[] fields = partitionType.fields();
+
+ this.types = new DataType[fields.length];
+ this.positions = new int[types.length];
+ this.javaTypes = new Class<?>[types.length];
+ this.reusedRow = new GenericInternalRow(types.length);
+
+ List<PartitionField> partitionFields = spec.fields();
+ for (int rowIndex = 0; rowIndex < fields.length; rowIndex += 1) {
+ this.types[rowIndex] = fields[rowIndex].dataType();
+
+ int sourceId = partitionSchema.columns().get(rowIndex).fieldId();
+ for (int specIndex = 0; specIndex < partitionFields.size(); specIndex += 1) {
+ PartitionField field = spec.fields().get(specIndex);
+ if (field.sourceId() == sourceId && "identity".equals(field.transform().toString())) {
+ positions[rowIndex] = specIndex;
+ javaTypes[rowIndex] = spec.javaClasses()[specIndex];
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public InternalRow apply(StructLike tuple) {
+ for (int i = 0; i < types.length; i += 1) {
+ Object value = tuple.get(positions[i], javaTypes[i]);
+ if (value != null) {
+ reusedRow.update(i, convert(value, types[i]));
+ } else {
+ reusedRow.setNullAt(i);
+ }
+ }
+
+ return reusedRow;
+ }
+
+ /**
+ * Converts the objects into instances used by Spark's InternalRow.
+ *
+ * @param value a data value
+ * @param type the Spark data type
+ * @return the value converted to the representation expected by Spark's InternalRow.
+ */
+ private static Object convert(Object value, DataType type) {
+ if (type instanceof StringType) {
+ return UTF8String.fromString(value.toString());
+ } else if (type instanceof BinaryType) {
+ return ByteBuffers.toByteArray((ByteBuffer) value);
+ } else if (type instanceof DecimalType) {
+ return Decimal.fromDecimal(value);
+ }
+ return value;
+ }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 51f6979..ea189d3 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -19,69 +19,38 @@
package org.apache.iceberg.spark.source;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
-import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.List;
import java.util.Locale;
-import java.util.Map;
import java.util.Set;
-import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DataTask;
import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.PartitionField;
-import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.common.DynMethods;
-import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.spark.data.SparkAvroReader;
-import org.apache.iceberg.spark.data.SparkOrcReader;
-import org.apache.iceberg.spark.data.SparkParquetReaders;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.ByteBuffers;
import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.Attribute;
-import org.apache.spark.sql.catalyst.expressions.AttributeReference;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.catalyst.expressions.JoinedRow;
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
@@ -91,17 +60,11 @@ import org.apache.spark.sql.sources.v2.reader.Statistics;
import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics;
-import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.UTF8String;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.collection.JavaConverters;
class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushDownRequiredColumns,
SupportsReportStatistics {
@@ -361,7 +324,7 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
@Override
public InputPartitionReader<InternalRow> createPartitionReader() {
- return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), io.value(),
+ return new RowDataReader(task, lazyTableSchema(), lazyExpectedSchema(), io.value(),
encryptionManager.value(), caseSensitive);
}
@@ -407,289 +370,6 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
}
}
- private static class TaskDataReader implements InputPartitionReader<InternalRow> {
- // for some reason, the apply method can't be called from Java without reflection
- private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply")
- .impl(UnsafeProjection.class, InternalRow.class)
- .build();
-
- private final Iterator<FileScanTask> tasks;
- private final Schema tableSchema;
- private final Schema expectedSchema;
- private final FileIO fileIo;
- private final Map<String, InputFile> inputFiles;
- private final boolean caseSensitive;
-
- private Iterator<InternalRow> currentIterator = null;
- private Closeable currentCloseable = null;
- private InternalRow current = null;
-
- TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO fileIo,
- EncryptionManager encryptionManager, boolean caseSensitive) {
- this.fileIo = fileIo;
- this.tasks = task.files().iterator();
- this.tableSchema = tableSchema;
- this.expectedSchema = expectedSchema;
- Iterable<InputFile> decryptedFiles = encryptionManager.decrypt(Iterables.transform(task.files(),
- fileScanTask ->
- EncryptedFiles.encryptedInput(
- this.fileIo.newInputFile(fileScanTask.file().path().toString()),
- fileScanTask.file().keyMetadata())));
- ImmutableMap.Builder<String, InputFile> inputFileBuilder = ImmutableMap.builder();
- decryptedFiles.forEach(decrypted -> inputFileBuilder.put(decrypted.location(), decrypted));
- this.inputFiles = inputFileBuilder.build();
- // open last because the schemas and fileIo must be set
- this.currentIterator = open(tasks.next());
- this.caseSensitive = caseSensitive;
- }
-
- @Override
- public boolean next() throws IOException {
- while (true) {
- if (currentIterator.hasNext()) {
- this.current = currentIterator.next();
- return true;
-
- } else if (tasks.hasNext()) {
- this.currentCloseable.close();
- this.currentIterator = open(tasks.next());
-
- } else {
- return false;
- }
- }
- }
-
- @Override
- public InternalRow get() {
- return current;
- }
-
- @Override
- public void close() throws IOException {
- InputFileBlockHolder.unset();
-
- // close the current iterator
- this.currentCloseable.close();
-
- // exhaust the task iterator
- while (tasks.hasNext()) {
- tasks.next();
- }
- }
-
- private Iterator<InternalRow> open(FileScanTask task) {
- DataFile file = task.file();
-
- // update the current file for Spark's filename() function
- InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());
-
- // schema or rows returned by readers
- Schema finalSchema = expectedSchema;
- PartitionSpec spec = task.spec();
- Set<Integer> idColumns = spec.identitySourceIds();
-
- // schema needed for the projection and filtering
- StructType sparkType = SparkSchemaUtil.convert(finalSchema);
- Schema requiredSchema = SparkSchemaUtil.prune(tableSchema, sparkType, task.residual(), caseSensitive);
- boolean hasJoinedPartitionColumns = !idColumns.isEmpty();
- boolean hasExtraFilterColumns = requiredSchema.columns().size() != finalSchema.columns().size();
-
- Schema iterSchema;
- Iterator<InternalRow> iter;
-
- if (hasJoinedPartitionColumns) {
- // schema used to read data files
- Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns);
- Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns);
- PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec);
- JoinedRow joined = new JoinedRow();
-
- InternalRow partition = convertToRow.apply(file.partition());
- joined.withRight(partition);
-
- // create joined rows and project from the joined schema to the final schema
- iterSchema = TypeUtil.join(readSchema, partitionSchema);
- iter = Iterators.transform(open(task, readSchema), joined::withLeft);
-
- } else if (hasExtraFilterColumns) {
- // add projection to the final schema
- iterSchema = requiredSchema;
- iter = open(task, requiredSchema);
-
- } else {
- // return the base iterator
- iterSchema = finalSchema;
- iter = open(task, finalSchema);
- }
-
- // TODO: remove the projection by reporting the iterator's schema back to Spark
- return Iterators.transform(iter,
- APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke);
- }
-
- private Iterator<InternalRow> open(FileScanTask task, Schema readSchema) {
- CloseableIterable<InternalRow> iter;
- if (task.isDataTask()) {
- iter = newDataIterable(task.asDataTask(), readSchema);
-
- } else {
- InputFile location = inputFiles.get(task.file().path().toString());
- Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
-
- switch (task.file().format()) {
- case PARQUET:
- iter = newParquetIterable(location, task, readSchema);
- break;
-
- case AVRO:
- iter = newAvroIterable(location, task, readSchema);
- break;
-
- case ORC:
- iter = newOrcIterable(location, task, readSchema);
- break;
-
- default:
- throw new UnsupportedOperationException(
- "Cannot read unknown format: " + task.file().format());
- }
- }
-
- this.currentCloseable = iter;
-
- return iter.iterator();
- }
-
- private static UnsafeProjection projection(Schema finalSchema, Schema readSchema) {
- StructType struct = SparkSchemaUtil.convert(readSchema);
-
- List<AttributeReference> refs = JavaConverters.seqAsJavaListConverter(struct.toAttributes()).asJava();
- List<Attribute> attrs = Lists.newArrayListWithExpectedSize(struct.fields().length);
- List<org.apache.spark.sql.catalyst.expressions.Expression> exprs =
- Lists.newArrayListWithExpectedSize(struct.fields().length);
-
- for (AttributeReference ref : refs) {
- attrs.add(ref.toAttribute());
- }
-
- for (Types.NestedField field : finalSchema.columns()) {
- int indexInReadSchema = struct.fieldIndex(field.name());
- exprs.add(refs.get(indexInReadSchema));
- }
-
- return UnsafeProjection.create(
- JavaConverters.asScalaBufferConverter(exprs).asScala().toSeq(),
- JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq());
- }
-
- private CloseableIterable<InternalRow> newAvroIterable(InputFile location,
- FileScanTask task,
- Schema readSchema) {
- return Avro.read(location)
- .reuseContainers()
- .project(readSchema)
- .split(task.start(), task.length())
- .createReaderFunc(SparkAvroReader::new)
- .build();
- }
-
- private CloseableIterable<InternalRow> newParquetIterable(InputFile location,
- FileScanTask task,
- Schema readSchema) {
- return Parquet.read(location)
- .project(readSchema)
- .split(task.start(), task.length())
- .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema))
- .filter(task.residual())
- .caseSensitive(caseSensitive)
- .build();
- }
-
- private CloseableIterable<InternalRow> newOrcIterable(InputFile location,
- FileScanTask task,
- Schema readSchema) {
- return ORC.read(location)
- .schema(readSchema)
- .split(task.start(), task.length())
- .createReaderFunc(SparkOrcReader::new)
- .caseSensitive(caseSensitive)
- .build();
- }
-
- private CloseableIterable<InternalRow> newDataIterable(DataTask task, Schema readSchema) {
- StructInternalRow row = new StructInternalRow(tableSchema.asStruct());
- CloseableIterable<InternalRow> asSparkRows = CloseableIterable.transform(
- task.asDataTask().rows(), row::setStruct);
- return CloseableIterable.transform(
- asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke);
- }
- }
-
- private static class PartitionRowConverter implements Function<StructLike, InternalRow> {
- private final DataType[] types;
- private final int[] positions;
- private final Class<?>[] javaTypes;
- private final GenericInternalRow reusedRow;
-
- PartitionRowConverter(Schema partitionSchema, PartitionSpec spec) {
- StructType partitionType = SparkSchemaUtil.convert(partitionSchema);
- StructField[] fields = partitionType.fields();
-
- this.types = new DataType[fields.length];
- this.positions = new int[types.length];
- this.javaTypes = new Class<?>[types.length];
- this.reusedRow = new GenericInternalRow(types.length);
-
- List<PartitionField> partitionFields = spec.fields();
- for (int rowIndex = 0; rowIndex < fields.length; rowIndex += 1) {
- this.types[rowIndex] = fields[rowIndex].dataType();
-
- int sourceId = partitionSchema.columns().get(rowIndex).fieldId();
- for (int specIndex = 0; specIndex < partitionFields.size(); specIndex += 1) {
- PartitionField field = spec.fields().get(specIndex);
- if (field.sourceId() == sourceId && "identity".equals(field.transform().toString())) {
- positions[rowIndex] = specIndex;
- javaTypes[rowIndex] = spec.javaClasses()[specIndex];
- break;
- }
- }
- }
- }
-
- @Override
- public InternalRow apply(StructLike tuple) {
- for (int i = 0; i < types.length; i += 1) {
- Object value = tuple.get(positions[i], javaTypes[i]);
- if (value != null) {
- reusedRow.update(i, convert(value, types[i]));
- } else {
- reusedRow.setNullAt(i);
- }
- }
-
- return reusedRow;
- }
-
- /**
- * Converts the objects into instances used by Spark's InternalRow.
- *
- * @param value a data value
- * @param type the Spark data type
- * @return the value converted to the representation expected by Spark's InternalRow.
- */
- private static Object convert(Object value, DataType type) {
- if (type instanceof StringType) {
- return UTF8String.fromString(value.toString());
- } else if (type instanceof BinaryType) {
- return ByteBuffers.toByteArray((ByteBuffer) value);
- } else if (type instanceof DecimalType) {
- return Decimal.fromDecimal(value);
- }
- return value;
- }
- }
-
private static class StructLikeInternalRow implements StructLike {
private final DataType[] types;
private InternalRow row = null;
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
new file mode 100644
index 0000000..ff5efea
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.SparkAvroReader;
+import org.apache.iceberg.spark.data.SparkOrcReader;
+import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.catalyst.expressions.AttributeReference;
+import org.apache.spark.sql.catalyst.expressions.JoinedRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.types.StructType;
+import scala.collection.JavaConverters;
+
+class RowDataReader extends BaseDataReader<InternalRow> {
+ // for some reason, the apply method can't be called from Java without reflection
+ private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply")
+ .impl(UnsafeProjection.class, InternalRow.class)
+ .build();
+
+ private final Schema tableSchema;
+ private final Schema expectedSchema;
+ private final boolean caseSensitive;
+
+ RowDataReader(
+ CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO fileIo,
+ EncryptionManager encryptionManager, boolean caseSensitive) {
+ super(task, fileIo, encryptionManager);
+ this.tableSchema = tableSchema;
+ this.expectedSchema = expectedSchema;
+ this.caseSensitive = caseSensitive;
+ }
+
+ @Override
+ Iterator<InternalRow> open(FileScanTask task) {
+ DataFile file = task.file();
+
+ // update the current file for Spark's filename() function
+ InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());
+
+ // schema or rows returned by readers
+ Schema finalSchema = expectedSchema;
+ PartitionSpec spec = task.spec();
+ Set<Integer> idColumns = spec.identitySourceIds();
+
+ // schema needed for the projection and filtering
+ StructType sparkType = SparkSchemaUtil.convert(finalSchema);
+ Schema requiredSchema = SparkSchemaUtil.prune(tableSchema, sparkType, task.residual(), caseSensitive);
+ boolean hasJoinedPartitionColumns = !idColumns.isEmpty();
+ boolean hasExtraFilterColumns = requiredSchema.columns().size() != finalSchema.columns().size();
+
+ Schema iterSchema;
+ Iterator<InternalRow> iter;
+
+ if (hasJoinedPartitionColumns) {
+ // schema used to read data files
+ Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns);
+ Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns);
+ PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec);
+ JoinedRow joined = new JoinedRow();
+
+ InternalRow partition = convertToRow.apply(file.partition());
+ joined.withRight(partition);
+
+ // create joined rows and project from the joined schema to the final schema
+ iterSchema = TypeUtil.join(readSchema, partitionSchema);
+ iter = Iterators.transform(open(task, readSchema), joined::withLeft);
+ } else if (hasExtraFilterColumns) {
+ // add projection to the final schema
+ iterSchema = requiredSchema;
+ iter = open(task, requiredSchema);
+ } else {
+ // return the base iterator
+ iterSchema = finalSchema;
+ iter = open(task, finalSchema);
+ }
+
+ // TODO: remove the projection by reporting the iterator's schema back to Spark
+ return Iterators.transform(
+ iter,
+ APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke);
+ }
+
+ private Iterator<InternalRow> open(FileScanTask task, Schema readSchema) {
+ CloseableIterable<InternalRow> iter;
+ if (task.isDataTask()) {
+ iter = newDataIterable(task.asDataTask(), readSchema);
+ } else {
+ InputFile location = getInputFile(task);
+ Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
+
+ switch (task.file().format()) {
+ case PARQUET:
+ iter = newParquetIterable(location, task, readSchema);
+ break;
+
+ case AVRO:
+ iter = newAvroIterable(location, task, readSchema);
+ break;
+
+ case ORC:
+ iter = newOrcIterable(location, task, readSchema);
+ break;
+
+ default:
+ throw new UnsupportedOperationException(
+ "Cannot read unknown format: " + task.file().format());
+ }
+ }
+
+ this.currentCloseable = iter;
+
+ return iter.iterator();
+ }
+
+ private CloseableIterable<InternalRow> newAvroIterable(
+ InputFile location,
+ FileScanTask task,
+ Schema readSchema) {
+ return Avro.read(location)
+ .reuseContainers()
+ .project(readSchema)
+ .split(task.start(), task.length())
+ .createReaderFunc(SparkAvroReader::new)
+ .build();
+ }
+
+ private CloseableIterable<InternalRow> newParquetIterable(
+ InputFile location,
+ FileScanTask task,
+ Schema readSchema) {
+ return Parquet.read(location)
+ .project(readSchema)
+ .split(task.start(), task.length())
+ .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema))
+ .filter(task.residual())
+ .caseSensitive(caseSensitive)
+ .build();
+ }
+
+ private CloseableIterable<InternalRow> newOrcIterable(
+ InputFile location,
+ FileScanTask task,
+ Schema readSchema) {
+ return ORC.read(location)
+ .schema(readSchema)
+ .split(task.start(), task.length())
+ .createReaderFunc(SparkOrcReader::new)
+ .caseSensitive(caseSensitive)
+ .build();
+ }
+
+ private CloseableIterable<InternalRow> newDataIterable(DataTask task, Schema readSchema) {
+ StructInternalRow row = new StructInternalRow(tableSchema.asStruct());
+ CloseableIterable<InternalRow> asSparkRows = CloseableIterable.transform(
+ task.asDataTask().rows(), row::setStruct);
+ return CloseableIterable.transform(
+ asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke);
+ }
+
+ private static UnsafeProjection projection(Schema finalSchema, Schema readSchema) {
+ StructType struct = SparkSchemaUtil.convert(readSchema);
+
+ List<AttributeReference> refs = JavaConverters.seqAsJavaListConverter(struct.toAttributes()).asJava();
+ List<Attribute> attrs = Lists.newArrayListWithExpectedSize(struct.fields().length);
+ List<org.apache.spark.sql.catalyst.expressions.Expression> exprs =
+ Lists.newArrayListWithExpectedSize(struct.fields().length);
+
+ for (AttributeReference ref : refs) {
+ attrs.add(ref.toAttribute());
+ }
+
+ for (Types.NestedField field : finalSchema.columns()) {
+ int indexInReadSchema = struct.fieldIndex(field.name());
+ exprs.add(refs.get(indexInReadSchema));
+ }
+
+ return UnsafeProjection.create(
+ JavaConverters.asScalaBufferConverter(exprs).asScala().toSeq(),
+ JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq());
+ }
+}