You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/03/21 02:11:58 UTC

[incubator-iceberg] branch master updated: Spark: Extract base data reader for vectorized reads (#853)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bc6455  Spark: Extract base data reader for vectorized reads (#853)
1bc6455 is described below

commit 1bc64553aa07d9a3bade5928e4fa4f5bc8bb8c4e
Author: Samarth Jain <sa...@apache.org>
AuthorDate: Fri Mar 20 19:11:48 2020 -0700

    Spark: Extract base data reader for vectorized reads (#853)
---
 .../iceberg/spark/source/BaseDataReader.java       | 110 +++++++
 .../spark/source/PartitionRowConverter.java        | 107 +++++++
 .../org/apache/iceberg/spark/source/Reader.java    | 322 +--------------------
 .../apache/iceberg/spark/source/RowDataReader.java | 224 ++++++++++++++
 4 files changed, 442 insertions(+), 321 deletions(-)

diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
new file mode 100644
index 0000000..1e3e07f
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.arrow.util.Preconditions;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+
+/**
+ * Base class of readers of type {@link InputPartitionReader} to read data as objects of type @param &lt;T&gt;
+ *
+ * @param <T> is the Java class returned by this reader whose objects contain one or more rows.
+ */
+@SuppressWarnings("checkstyle:VisibilityModifier")
+abstract class BaseDataReader<T> implements InputPartitionReader<T> {
+  private final Iterator<FileScanTask> tasks;
+  private final FileIO fileIo;
+  private final Map<String, InputFile> inputFiles;
+
+  private Iterator<T> currentIterator;
+  Closeable currentCloseable;
+  private T current = null;
+
+  BaseDataReader(CombinedScanTask task, FileIO fileIo, EncryptionManager encryptionManager) {
+    this.fileIo = fileIo;
+    this.tasks = task.files().iterator();
+    Iterable<InputFile> decryptedFiles = encryptionManager.decrypt(Iterables.transform(
+        task.files(),
+        fileScanTask ->
+            EncryptedFiles.encryptedInput(
+                this.fileIo.newInputFile(fileScanTask.file().path().toString()),
+                fileScanTask.file().keyMetadata())));
+    ImmutableMap.Builder<String, InputFile> inputFileBuilder = ImmutableMap.builder();
+    decryptedFiles.forEach(decrypted -> inputFileBuilder.put(decrypted.location(), decrypted));
+    this.inputFiles = inputFileBuilder.build();
+    this.currentCloseable = CloseableIterable.empty();
+    this.currentIterator = Collections.emptyIterator();
+  }
+
+  @Override
+  public boolean next() throws IOException {
+    while (true) {
+      if (currentIterator.hasNext()) {
+        this.current = currentIterator.next();
+        return true;
+      } else if (tasks.hasNext()) {
+        this.currentCloseable.close();
+        this.currentIterator = open(tasks.next());
+      } else {
+        return false;
+      }
+    }
+  }
+
+  @Override
+  public T get() {
+    return current;
+  }
+
+  abstract Iterator<T> open(FileScanTask task);
+
+  @Override
+  public void close() throws IOException {
+    InputFileBlockHolder.unset();
+
+    // close the current iterator
+    this.currentCloseable.close();
+
+    // exhaust the task iterator
+    while (tasks.hasNext()) {
+      tasks.next();
+    }
+  }
+
+  InputFile getInputFile(FileScanTask task) {
+    Preconditions.checkArgument(!task.isDataTask(), "Invalid task type");
+    return inputFiles.get(task.file().path().toString());
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/PartitionRowConverter.java b/spark/src/main/java/org/apache/iceberg/spark/source/PartitionRowConverter.java
new file mode 100644
index 0000000..13996f9
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/PartitionRowConverter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Objects of this class generate an {@link InternalRow} by utilizing the partition schema passed during construction.
+ */
+class PartitionRowConverter implements Function<StructLike, InternalRow> {
+  private final DataType[] types;
+  private final int[] positions;
+  private final Class<?>[] javaTypes;
+  private final GenericInternalRow reusedRow;
+
+  PartitionRowConverter(Schema partitionSchema, PartitionSpec spec) {
+    StructType partitionType = SparkSchemaUtil.convert(partitionSchema);
+    StructField[] fields = partitionType.fields();
+
+    this.types = new DataType[fields.length];
+    this.positions = new int[types.length];
+    this.javaTypes = new Class<?>[types.length];
+    this.reusedRow = new GenericInternalRow(types.length);
+
+    List<PartitionField> partitionFields = spec.fields();
+    for (int rowIndex = 0; rowIndex < fields.length; rowIndex += 1) {
+      this.types[rowIndex] = fields[rowIndex].dataType();
+
+      int sourceId = partitionSchema.columns().get(rowIndex).fieldId();
+      for (int specIndex = 0; specIndex < partitionFields.size(); specIndex += 1) {
+        PartitionField field = spec.fields().get(specIndex);
+        if (field.sourceId() == sourceId && "identity".equals(field.transform().toString())) {
+          positions[rowIndex] = specIndex;
+          javaTypes[rowIndex] = spec.javaClasses()[specIndex];
+          break;
+        }
+      }
+    }
+  }
+
+  @Override
+  public InternalRow apply(StructLike tuple) {
+    for (int i = 0; i < types.length; i += 1) {
+      Object value = tuple.get(positions[i], javaTypes[i]);
+      if (value != null) {
+        reusedRow.update(i, convert(value, types[i]));
+      } else {
+        reusedRow.setNullAt(i);
+      }
+    }
+
+    return reusedRow;
+  }
+
+  /**
+   * Converts the objects into instances used by Spark's InternalRow.
+   *
+   * @param value a data value
+   * @param type  the Spark data type
+   * @return the value converted to the representation expected by Spark's InternalRow.
+   */
+  private static Object convert(Object value, DataType type) {
+    if (type instanceof StringType) {
+      return UTF8String.fromString(value.toString());
+    } else if (type instanceof BinaryType) {
+      return ByteBuffers.toByteArray((ByteBuffer) value);
+    } else if (type instanceof DecimalType) {
+      return Decimal.fromDecimal(value);
+    }
+    return value;
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 51f6979..ea189d3 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -19,69 +19,38 @@
 
 package org.apache.iceberg.spark.source;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-import java.io.Closeable;
 import java.io.IOException;
 import java.io.Serializable;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 import java.util.Set;
-import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DataTask;
 import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.PartitionField;
-import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.common.DynMethods;
-import org.apache.iceberg.encryption.EncryptedFiles;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.spark.SparkFilters;
 import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.spark.data.SparkAvroReader;
-import org.apache.iceberg.spark.data.SparkOrcReader;
-import org.apache.iceberg.spark.data.SparkParquetReaders;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.ByteBuffers;
 import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.rdd.InputFileBlockHolder;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.Attribute;
-import org.apache.spark.sql.catalyst.expressions.AttributeReference;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.catalyst.expressions.JoinedRow;
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
 import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
@@ -91,17 +60,11 @@ import org.apache.spark.sql.sources.v2.reader.Statistics;
 import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
 import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
 import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics;
-import org.apache.spark.sql.types.BinaryType;
 import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.StringType;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.UTF8String;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.collection.JavaConverters;
 
 class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushDownRequiredColumns,
     SupportsReportStatistics {
@@ -361,7 +324,7 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
 
     @Override
     public InputPartitionReader<InternalRow> createPartitionReader() {
-      return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), io.value(),
+      return new RowDataReader(task, lazyTableSchema(), lazyExpectedSchema(), io.value(),
         encryptionManager.value(), caseSensitive);
     }
 
@@ -407,289 +370,6 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
     }
   }
 
-  private static class TaskDataReader implements InputPartitionReader<InternalRow> {
-    // for some reason, the apply method can't be called from Java without reflection
-    private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply")
-        .impl(UnsafeProjection.class, InternalRow.class)
-        .build();
-
-    private final Iterator<FileScanTask> tasks;
-    private final Schema tableSchema;
-    private final Schema expectedSchema;
-    private final FileIO fileIo;
-    private final Map<String, InputFile> inputFiles;
-    private final boolean caseSensitive;
-
-    private Iterator<InternalRow> currentIterator = null;
-    private Closeable currentCloseable = null;
-    private InternalRow current = null;
-
-    TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO fileIo,
-                   EncryptionManager encryptionManager, boolean caseSensitive) {
-      this.fileIo = fileIo;
-      this.tasks = task.files().iterator();
-      this.tableSchema = tableSchema;
-      this.expectedSchema = expectedSchema;
-      Iterable<InputFile> decryptedFiles = encryptionManager.decrypt(Iterables.transform(task.files(),
-          fileScanTask ->
-              EncryptedFiles.encryptedInput(
-                  this.fileIo.newInputFile(fileScanTask.file().path().toString()),
-                  fileScanTask.file().keyMetadata())));
-      ImmutableMap.Builder<String, InputFile> inputFileBuilder = ImmutableMap.builder();
-      decryptedFiles.forEach(decrypted -> inputFileBuilder.put(decrypted.location(), decrypted));
-      this.inputFiles = inputFileBuilder.build();
-      // open last because the schemas and fileIo must be set
-      this.currentIterator = open(tasks.next());
-      this.caseSensitive = caseSensitive;
-    }
-
-    @Override
-    public boolean next() throws IOException {
-      while (true) {
-        if (currentIterator.hasNext()) {
-          this.current = currentIterator.next();
-          return true;
-
-        } else if (tasks.hasNext()) {
-          this.currentCloseable.close();
-          this.currentIterator = open(tasks.next());
-
-        } else {
-          return false;
-        }
-      }
-    }
-
-    @Override
-    public InternalRow get() {
-      return current;
-    }
-
-    @Override
-    public void close() throws IOException {
-      InputFileBlockHolder.unset();
-
-      // close the current iterator
-      this.currentCloseable.close();
-
-      // exhaust the task iterator
-      while (tasks.hasNext()) {
-        tasks.next();
-      }
-    }
-
-    private Iterator<InternalRow> open(FileScanTask task) {
-      DataFile file = task.file();
-
-      // update the current file for Spark's filename() function
-      InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());
-
-      // schema or rows returned by readers
-      Schema finalSchema = expectedSchema;
-      PartitionSpec spec = task.spec();
-      Set<Integer> idColumns = spec.identitySourceIds();
-
-      // schema needed for the projection and filtering
-      StructType sparkType = SparkSchemaUtil.convert(finalSchema);
-      Schema requiredSchema = SparkSchemaUtil.prune(tableSchema, sparkType, task.residual(), caseSensitive);
-      boolean hasJoinedPartitionColumns = !idColumns.isEmpty();
-      boolean hasExtraFilterColumns = requiredSchema.columns().size() != finalSchema.columns().size();
-
-      Schema iterSchema;
-      Iterator<InternalRow> iter;
-
-      if (hasJoinedPartitionColumns) {
-        // schema used to read data files
-        Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns);
-        Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns);
-        PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec);
-        JoinedRow joined = new JoinedRow();
-
-        InternalRow partition = convertToRow.apply(file.partition());
-        joined.withRight(partition);
-
-        // create joined rows and project from the joined schema to the final schema
-        iterSchema = TypeUtil.join(readSchema, partitionSchema);
-        iter = Iterators.transform(open(task, readSchema), joined::withLeft);
-
-      } else if (hasExtraFilterColumns) {
-        // add projection to the final schema
-        iterSchema = requiredSchema;
-        iter = open(task, requiredSchema);
-
-      } else {
-        // return the base iterator
-        iterSchema = finalSchema;
-        iter = open(task, finalSchema);
-      }
-
-      // TODO: remove the projection by reporting the iterator's schema back to Spark
-      return Iterators.transform(iter,
-          APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke);
-    }
-
-    private Iterator<InternalRow> open(FileScanTask task, Schema readSchema) {
-      CloseableIterable<InternalRow> iter;
-      if (task.isDataTask()) {
-        iter = newDataIterable(task.asDataTask(), readSchema);
-
-      } else {
-        InputFile location = inputFiles.get(task.file().path().toString());
-        Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
-
-        switch (task.file().format()) {
-          case PARQUET:
-            iter = newParquetIterable(location, task, readSchema);
-            break;
-
-          case AVRO:
-            iter = newAvroIterable(location, task, readSchema);
-            break;
-
-          case ORC:
-            iter = newOrcIterable(location, task, readSchema);
-            break;
-
-          default:
-            throw new UnsupportedOperationException(
-                "Cannot read unknown format: " + task.file().format());
-        }
-      }
-
-      this.currentCloseable = iter;
-
-      return iter.iterator();
-    }
-
-    private static UnsafeProjection projection(Schema finalSchema, Schema readSchema) {
-      StructType struct = SparkSchemaUtil.convert(readSchema);
-
-      List<AttributeReference> refs = JavaConverters.seqAsJavaListConverter(struct.toAttributes()).asJava();
-      List<Attribute> attrs = Lists.newArrayListWithExpectedSize(struct.fields().length);
-      List<org.apache.spark.sql.catalyst.expressions.Expression> exprs =
-          Lists.newArrayListWithExpectedSize(struct.fields().length);
-
-      for (AttributeReference ref : refs) {
-        attrs.add(ref.toAttribute());
-      }
-
-      for (Types.NestedField field : finalSchema.columns()) {
-        int indexInReadSchema = struct.fieldIndex(field.name());
-        exprs.add(refs.get(indexInReadSchema));
-      }
-
-      return UnsafeProjection.create(
-          JavaConverters.asScalaBufferConverter(exprs).asScala().toSeq(),
-          JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq());
-    }
-
-    private CloseableIterable<InternalRow> newAvroIterable(InputFile location,
-                                                      FileScanTask task,
-                                                      Schema readSchema) {
-      return Avro.read(location)
-          .reuseContainers()
-          .project(readSchema)
-          .split(task.start(), task.length())
-          .createReaderFunc(SparkAvroReader::new)
-          .build();
-    }
-
-    private CloseableIterable<InternalRow> newParquetIterable(InputFile location,
-                                                            FileScanTask task,
-                                                            Schema readSchema) {
-      return Parquet.read(location)
-          .project(readSchema)
-          .split(task.start(), task.length())
-          .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema))
-          .filter(task.residual())
-          .caseSensitive(caseSensitive)
-          .build();
-    }
-
-    private CloseableIterable<InternalRow> newOrcIterable(InputFile location,
-                                                          FileScanTask task,
-                                                          Schema readSchema) {
-      return ORC.read(location)
-          .schema(readSchema)
-          .split(task.start(), task.length())
-          .createReaderFunc(SparkOrcReader::new)
-          .caseSensitive(caseSensitive)
-          .build();
-    }
-
-    private CloseableIterable<InternalRow> newDataIterable(DataTask task, Schema readSchema) {
-      StructInternalRow row = new StructInternalRow(tableSchema.asStruct());
-      CloseableIterable<InternalRow> asSparkRows = CloseableIterable.transform(
-          task.asDataTask().rows(), row::setStruct);
-      return CloseableIterable.transform(
-          asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke);
-    }
-  }
-
-  private static class PartitionRowConverter implements Function<StructLike, InternalRow> {
-    private final DataType[] types;
-    private final int[] positions;
-    private final Class<?>[] javaTypes;
-    private final GenericInternalRow reusedRow;
-
-    PartitionRowConverter(Schema partitionSchema, PartitionSpec spec) {
-      StructType partitionType = SparkSchemaUtil.convert(partitionSchema);
-      StructField[] fields = partitionType.fields();
-
-      this.types = new DataType[fields.length];
-      this.positions = new int[types.length];
-      this.javaTypes = new Class<?>[types.length];
-      this.reusedRow = new GenericInternalRow(types.length);
-
-      List<PartitionField> partitionFields = spec.fields();
-      for (int rowIndex = 0; rowIndex < fields.length; rowIndex += 1) {
-        this.types[rowIndex] = fields[rowIndex].dataType();
-
-        int sourceId = partitionSchema.columns().get(rowIndex).fieldId();
-        for (int specIndex = 0; specIndex < partitionFields.size(); specIndex += 1) {
-          PartitionField field = spec.fields().get(specIndex);
-          if (field.sourceId() == sourceId && "identity".equals(field.transform().toString())) {
-            positions[rowIndex] = specIndex;
-            javaTypes[rowIndex] = spec.javaClasses()[specIndex];
-            break;
-          }
-        }
-      }
-    }
-
-    @Override
-    public InternalRow apply(StructLike tuple) {
-      for (int i = 0; i < types.length; i += 1) {
-        Object value = tuple.get(positions[i], javaTypes[i]);
-        if (value != null) {
-          reusedRow.update(i, convert(value, types[i]));
-        } else {
-          reusedRow.setNullAt(i);
-        }
-      }
-
-      return reusedRow;
-    }
-
-    /**
-     * Converts the objects into instances used by Spark's InternalRow.
-     *
-     * @param value a data value
-     * @param type the Spark data type
-     * @return the value converted to the representation expected by Spark's InternalRow.
-     */
-    private static Object convert(Object value, DataType type) {
-      if (type instanceof StringType) {
-        return UTF8String.fromString(value.toString());
-      } else if (type instanceof BinaryType) {
-        return ByteBuffers.toByteArray((ByteBuffer) value);
-      } else if (type instanceof DecimalType) {
-        return Decimal.fromDecimal(value);
-      }
-      return value;
-    }
-  }
-
   private static class StructLikeInternalRow implements StructLike {
     private final DataType[] types;
     private InternalRow row = null;
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
new file mode 100644
index 0000000..ff5efea
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.SparkAvroReader;
+import org.apache.iceberg.spark.data.SparkOrcReader;
+import org.apache.iceberg.spark.data.SparkParquetReaders;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.catalyst.expressions.AttributeReference;
+import org.apache.spark.sql.catalyst.expressions.JoinedRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.types.StructType;
+import scala.collection.JavaConverters;
+
+class RowDataReader extends BaseDataReader<InternalRow> {
+  // for some reason, the apply method can't be called from Java without reflection
+  private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply")
+      .impl(UnsafeProjection.class, InternalRow.class)
+      .build();
+
+  private final Schema tableSchema;
+  private final Schema expectedSchema;
+  private final boolean caseSensitive;
+
+  RowDataReader(
+      CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO fileIo,
+      EncryptionManager encryptionManager, boolean caseSensitive) {
+    super(task, fileIo, encryptionManager);
+    this.tableSchema = tableSchema;
+    this.expectedSchema = expectedSchema;
+    this.caseSensitive = caseSensitive;
+  }
+
+  @Override
+  Iterator<InternalRow> open(FileScanTask task) {
+    DataFile file = task.file();
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());
+
+    // schema or rows returned by readers
+    Schema finalSchema = expectedSchema;
+    PartitionSpec spec = task.spec();
+    Set<Integer> idColumns = spec.identitySourceIds();
+
+    // schema needed for the projection and filtering
+    StructType sparkType = SparkSchemaUtil.convert(finalSchema);
+    Schema requiredSchema = SparkSchemaUtil.prune(tableSchema, sparkType, task.residual(), caseSensitive);
+    boolean hasJoinedPartitionColumns = !idColumns.isEmpty();
+    boolean hasExtraFilterColumns = requiredSchema.columns().size() != finalSchema.columns().size();
+
+    Schema iterSchema;
+    Iterator<InternalRow> iter;
+
+    if (hasJoinedPartitionColumns) {
+      // schema used to read data files
+      Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns);
+      Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns);
+      PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec);
+      JoinedRow joined = new JoinedRow();
+
+      InternalRow partition = convertToRow.apply(file.partition());
+      joined.withRight(partition);
+
+      // create joined rows and project from the joined schema to the final schema
+      iterSchema = TypeUtil.join(readSchema, partitionSchema);
+      iter = Iterators.transform(open(task, readSchema), joined::withLeft);
+    } else if (hasExtraFilterColumns) {
+      // add projection to the final schema
+      iterSchema = requiredSchema;
+      iter = open(task, requiredSchema);
+    } else {
+      // return the base iterator
+      iterSchema = finalSchema;
+      iter = open(task, finalSchema);
+    }
+
+    // TODO: remove the projection by reporting the iterator's schema back to Spark
+    return Iterators.transform(
+        iter,
+        APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke);
+  }
+
+  private Iterator<InternalRow> open(FileScanTask task, Schema readSchema) {
+    CloseableIterable<InternalRow> iter;
+    if (task.isDataTask()) {
+      iter = newDataIterable(task.asDataTask(), readSchema);
+    } else {
+      InputFile location = getInputFile(task);
+      Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
+
+      switch (task.file().format()) {
+        case PARQUET:
+          iter = newParquetIterable(location, task, readSchema);
+          break;
+
+        case AVRO:
+          iter = newAvroIterable(location, task, readSchema);
+          break;
+
+        case ORC:
+          iter = newOrcIterable(location, task, readSchema);
+          break;
+
+        default:
+          throw new UnsupportedOperationException(
+              "Cannot read unknown format: " + task.file().format());
+      }
+    }
+
+    this.currentCloseable = iter;
+
+    return iter.iterator();
+  }
+
+  private CloseableIterable<InternalRow> newAvroIterable(
+      InputFile location,
+      FileScanTask task,
+      Schema readSchema) {
+    return Avro.read(location)
+        .reuseContainers()
+        .project(readSchema)
+        .split(task.start(), task.length())
+        .createReaderFunc(SparkAvroReader::new)
+        .build();
+  }
+
+  private CloseableIterable<InternalRow> newParquetIterable(
+      InputFile location,
+      FileScanTask task,
+      Schema readSchema) {
+    return Parquet.read(location)
+        .project(readSchema)
+        .split(task.start(), task.length())
+        .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema))
+        .filter(task.residual())
+        .caseSensitive(caseSensitive)
+        .build();
+  }
+
+  private CloseableIterable<InternalRow> newOrcIterable(
+      InputFile location,
+      FileScanTask task,
+      Schema readSchema) {
+    return ORC.read(location)
+        .schema(readSchema)
+        .split(task.start(), task.length())
+        .createReaderFunc(SparkOrcReader::new)
+        .caseSensitive(caseSensitive)
+        .build();
+  }
+
+  private CloseableIterable<InternalRow> newDataIterable(DataTask task, Schema readSchema) {
+    StructInternalRow row = new StructInternalRow(tableSchema.asStruct());
+    CloseableIterable<InternalRow> asSparkRows = CloseableIterable.transform(
+        task.asDataTask().rows(), row::setStruct);
+    return CloseableIterable.transform(
+        asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke);
+  }
+
+  private static UnsafeProjection projection(Schema finalSchema, Schema readSchema) {
+    StructType struct = SparkSchemaUtil.convert(readSchema);
+
+    List<AttributeReference> refs = JavaConverters.seqAsJavaListConverter(struct.toAttributes()).asJava();
+    List<Attribute> attrs = Lists.newArrayListWithExpectedSize(struct.fields().length);
+    List<org.apache.spark.sql.catalyst.expressions.Expression> exprs =
+        Lists.newArrayListWithExpectedSize(struct.fields().length);
+
+    for (AttributeReference ref : refs) {
+      attrs.add(ref.toAttribute());
+    }
+
+    for (Types.NestedField field : finalSchema.columns()) {
+      int indexInReadSchema = struct.fieldIndex(field.name());
+      exprs.add(refs.get(indexInReadSchema));
+    }
+
+    return UnsafeProjection.create(
+        JavaConverters.asScalaBufferConverter(exprs).asScala().toSeq(),
+        JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq());
+  }
+}