You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/12/19 19:14:00 UTC

[incubator-iceberg] branch vectorized-read updated: Enable a path for consuming applications to not reuse underlying arrow buffers. (#707)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch vectorized-read
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/vectorized-read by this push:
     new 1961674  Enable a path for consuming applications to not reuse underlying arrow buffers. (#707)
1961674 is described below

commit 1961674582b9a6252a43cd2150564f798d478fef
Author: Samarth Jain <sa...@apache.org>
AuthorDate: Thu Dec 19 11:13:48 2019 -0800

    Enable a path for consuming applications to not reuse underlying arrow buffers. (#707)
    
    Clean up interfaces and remove unused code.
---
 .../iceberg/arrow/vectorized/VectorHolder.java     | 11 ++++-
 .../arrow/vectorized/VectorizedArrowReader.java    | 18 +++++---
 .../parquet/VectorizedParquetValuesReader.java     | 10 ++---
 .../java/org/apache/iceberg/parquet/Parquet.java   |  6 +--
 .../vectorized/VectorizedParquetReader.java        | 21 ++++------
 .../parquet/vectorized/VectorizedReader.java       |  8 ++--
 .../data/vectorized/ColumnarBatchReaders.java      | 22 +++++-----
 .../data/vectorized/IcebergArrowColumnVector.java  |  4 +-
 .../vectorized/VectorizedSparkParquetReaders.java  |  9 ++--
 .../org/apache/iceberg/spark/source/Reader.java    |  2 -
 .../iceberg/spark/source/VectorizedReading.java    | 48 +---------------------
 11 files changed, 61 insertions(+), 98 deletions(-)

diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java
index 5cf0ae8..33e7427 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java
@@ -35,18 +35,21 @@ public class VectorHolder {
 
   @Nullable
   private final Dictionary dictionary;
+  private final NullabilityHolder nullabilityHolder;
 
-  public static final VectorHolder NULL_VECTOR_HOLDER = new VectorHolder(null, null, false, null);
+  public static final VectorHolder NULL_VECTOR_HOLDER = new VectorHolder(null, null, false, null, null);
 
   public VectorHolder(
       ColumnDescriptor columnDescriptor,
       FieldVector vector,
       boolean isDictionaryEncoded,
-      Dictionary dictionary) {
+      Dictionary dictionary,
+      NullabilityHolder holder) {
     this.columnDescriptor = columnDescriptor;
     this.vector = vector;
     this.isDictionaryEncoded = isDictionaryEncoded;
     this.dictionary = dictionary;
+    this.nullabilityHolder = holder;
   }
 
   public ColumnDescriptor getDescriptor() {
@@ -64,4 +67,8 @@ public class VectorHolder {
   public Dictionary getDictionary() {
     return dictionary;
   }
+
+  public NullabilityHolder getNullabilityHolder() {
+    return nullabilityHolder;
+  }
 }
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
index 3b0f022..ab8e5b8 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
@@ -51,7 +51,7 @@ import org.apache.parquet.schema.PrimitiveType;
  * It also takes care of allocating the right kind of Arrow vectors depending on the corresponding
  * Iceberg/Parquet data types.
  */
-public class VectorizedArrowReader implements VectorizedReader<NullabilityHolder, VectorHolder> {
+public class VectorizedArrowReader implements VectorizedReader<VectorHolder> {
   public static final int DEFAULT_BATCH_SIZE = 5000;
   public static final int UNKNOWN_WIDTH = -1;
 
@@ -71,6 +71,8 @@ public class VectorizedArrowReader implements VectorizedReader<NullabilityHolder
   private final BufferAllocator rootAlloc;
   private FieldVector vec;
   private int typeWidth;
+  private boolean reuseContainers = true;
+  private NullabilityHolder nullabilityHolder;
 
   // In cases when Parquet employs fall back encoding, we eagerly decode the dictionary encoded data
   // before storing the values in the Arrow vector. This means even if the dictionary is present, data
@@ -122,11 +124,12 @@ public class VectorizedArrowReader implements VectorizedReader<NullabilityHolder
 
   @SuppressWarnings("checkstyle:CyclomaticComplexity")
   @Override
-  public VectorHolder read(NullabilityHolder nullabilityHolder) {
-    if (vec == null) {
+  public VectorHolder read() {
+    if (vec == null || !reuseContainers) {
       typeWidth = allocateFieldVector();
     }
     vec.setValueCount(0);
+    nullabilityHolder = new NullabilityHolder(batchSize);
     if (vectorizedColumnIterator.hasNext()) {
       if (allPagesDictEncoded) {
         vectorizedColumnIterator.nextBatchDictionaryIds((IntVector) vec, nullabilityHolder);
@@ -163,7 +166,7 @@ public class VectorizedArrowReader implements VectorizedReader<NullabilityHolder
         }
       }
     }
-    return new VectorHolder(columnDescriptor, vec, allPagesDictEncoded, dictionary);
+    return new VectorHolder(columnDescriptor, vec, allPagesDictEncoded, dictionary, nullabilityHolder);
   }
 
   private int allocateFieldVector() {
@@ -280,6 +283,11 @@ public class VectorizedArrowReader implements VectorizedReader<NullabilityHolder
   }
 
   @Override
+  public void reuseContainers(boolean reuse) {
+    this.reuseContainers = reuse;
+  }
+
+  @Override
   public String toString() {
     return columnDescriptor.toString();
   }
@@ -287,7 +295,7 @@ public class VectorizedArrowReader implements VectorizedReader<NullabilityHolder
   public static final VectorizedArrowReader NULL_VALUES_READER =
       new VectorizedArrowReader() {
         @Override
-        public VectorHolder read(NullabilityHolder holder) {
+        public VectorHolder read() {
           return VectorHolder.NULL_VECTOR_HOLDER;
         }
 
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetValuesReader.java
index 0e2256e..b4b6dc2 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetValuesReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetValuesReader.java
@@ -1008,7 +1008,6 @@ public final class VectorizedParquetValuesReader extends ValuesReader {
       switch (mode) {
         case RLE:
           for (int i = 0; i < num; i++) {
-            // TODO: samarth I am assuming/hopeful that the decimalBytes array has typeWidth length
             byte[] decimalBytes = dict.decodeToBinary(currentValue).getBytesUnsafe();
             byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
             System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
@@ -1018,7 +1017,6 @@ public final class VectorizedParquetValuesReader extends ValuesReader {
           break;
         case PACKED:
           for (int i = 0; i < num; i++) {
-            // TODO: samarth I am assuming/hopeful that the decimal bytes has typeWidth length
             byte[] decimalBytes = dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe();
             byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
             System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
@@ -1198,13 +1196,11 @@ public final class VectorizedParquetValuesReader extends ValuesReader {
               byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
               valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
               vector.getDataBuffer().setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
-              bufferIdx++;
-              //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
+              BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), bufferIdx);
             } else {
-              //BitVectorHelper.setValidityBitToOne(validityBuffer, validityBufferIdx);
-              nullabilityHolder.setNull(bufferIdx);
-              bufferIdx++;
+              setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
             }
+            bufferIdx++;
           }
           break;
       }
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 52a1cde..9ea7713 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -286,7 +286,7 @@ public class Parquet {
     private Schema schema = null;
     private Expression filter = null;
     private ReadSupport<?> readSupport = null;
-    private Function<MessageType, VectorizedReader> batchedReaderFunc = null;
+    private Function<MessageType, VectorizedReader<?>> batchedReaderFunc = null;
     private Function<MessageType, ParquetValueReader<?>> readerFunc = null;
     private boolean isBatchedReadEnabled = false;
     private boolean filterRecords = true;
@@ -347,7 +347,7 @@ public class Parquet {
       return this;
     }
 
-    public ReadBuilder createBatchedReaderFunc(Function<MessageType, VectorizedReader> func) {
+    public ReadBuilder createBatchedReaderFunc(Function<MessageType, VectorizedReader<?>> func) {
       this.batchedReaderFunc = func;
       return this;
     }
@@ -404,7 +404,7 @@ public class Parquet {
         ParquetReadOptions options = optionsBuilder.build();
 
         if (isBatchedReadEnabled) {
-          return new VectorizedParquetReader<>(file, schema, options, batchedReaderFunc, filter, reuseContainers,
+          return new VectorizedParquetReader(file, schema, options, batchedReaderFunc, filter, reuseContainers,
               caseSensitive, maxRecordsPerBatch);
         } else {
 
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetReader.java
index 00528d5..2d89cd5 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedParquetReader.java
@@ -52,7 +52,7 @@ public class VectorizedParquetReader<T> extends CloseableGroup implements Closea
   private final InputFile input;
   private final Schema expectedSchema;
   private final ParquetReadOptions options;
-  private final Function<MessageType, VectorizedReader> batchReaderFunc;
+  private final Function<MessageType, VectorizedReader<T>> batchReaderFunc;
   private final Expression filter;
   private final boolean reuseContainers;
   private final boolean caseSensitive;
@@ -60,7 +60,7 @@ public class VectorizedParquetReader<T> extends CloseableGroup implements Closea
 
   public VectorizedParquetReader(
       InputFile input, Schema expectedSchema, ParquetReadOptions options,
-      Function<MessageType, VectorizedReader> readerFunc,
+      Function<MessageType, VectorizedReader<T>> readerFunc,
       Expression filter, boolean reuseContainers, boolean caseSensitive, int maxRecordsPerBatch) {
     this.input = input;
     this.expectedSchema = expectedSchema;
@@ -78,7 +78,7 @@ public class VectorizedParquetReader<T> extends CloseableGroup implements Closea
     private final InputFile file;
     private final ParquetReadOptions options;
     private final MessageType projection;
-    private final VectorizedReader model;
+    private final VectorizedReader<T> model;
     private final List<BlockMetaData> rowGroups;
     private final boolean[] shouldSkip;
     private final long totalValues;
@@ -87,7 +87,7 @@ public class VectorizedParquetReader<T> extends CloseableGroup implements Closea
 
     @SuppressWarnings("unchecked")
     ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
-        Function<MessageType, VectorizedReader> readerFunc, boolean reuseContainers,
+        Function<MessageType, VectorizedReader<T>> readerFunc, boolean reuseContainers,
         boolean caseSensitive, int bSize) {
       this.file = file;
       this.options = options;
@@ -190,7 +190,7 @@ public class VectorizedParquetReader<T> extends CloseableGroup implements Closea
 
   private ReadConf init() {
     if (conf == null) {
-      ReadConf<T> readConf = new ReadConf(
+      ReadConf readConf = new ReadConf(
           input, options, expectedSchema, filter, batchReaderFunc, reuseContainers, caseSensitive, batchSize);
       this.conf = readConf.copy();
       return readConf;
@@ -209,9 +209,8 @@ public class VectorizedParquetReader<T> extends CloseableGroup implements Closea
   private static class FileIterator<T> implements Iterator<T>, Closeable {
     private final ParquetFileReader reader;
     private final boolean[] shouldSkip;
-    private final VectorizedReader model;
+    private final VectorizedReader<T> model;
     private final long totalValues;
-    private final boolean reuseContainers;
     private final int batchSize;
 
     private int nextRowGroup = 0;
@@ -224,7 +223,7 @@ public class VectorizedParquetReader<T> extends CloseableGroup implements Closea
       this.shouldSkip = conf.shouldSkip();
       this.model = conf.model();
       this.totalValues = conf.totalValues();
-      this.reuseContainers = conf.reuseContainers();
+      this.model.reuseContainers(conf.reuseContainers());
       this.batchSize = conf.batchSize();
     }
 
@@ -241,11 +240,7 @@ public class VectorizedParquetReader<T> extends CloseableGroup implements Closea
       if (valuesRead >= nextRowGroupStart) {
         advance();
       }
-      if (reuseContainers) {
-        this.last = (T) model.read(null);
-      } else {
-        this.last = (T) model.read(null);
-      }
+      this.last = model.read();
       valuesRead += Math.min(nextRowGroupStart - valuesRead, batchSize);
       return last;
     }
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedReader.java
index 5fb78be..d181162 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/vectorized/VectorizedReader.java
@@ -25,12 +25,14 @@ import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 
 /**
- * Marker interface for vectorized Iceberg readers.
+ * Interface for vectorized Iceberg readers.
  */
-public interface VectorizedReader<E, T> {
-  T read(E input);
+public interface VectorizedReader<T> {
+  T read();
 
   void setRowGroupInfo(PageReadStore pages, DictionaryPageReadStore dictionaryPageReadStore,
                        Map<ColumnPath, Boolean> columnPathBooleanMap);
+
+  void reuseContainers(boolean reuse);
 }
 
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReaders.java
index 18c2b12..d266fc1 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReaders.java
@@ -23,15 +23,12 @@ import java.lang.reflect.Array;
 import java.util.List;
 import java.util.Map;
 import org.apache.arrow.vector.FieldVector;
-import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
 import org.apache.iceberg.arrow.vectorized.VectorHolder;
 import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
 import org.apache.iceberg.parquet.vectorized.VectorizedReader;
-import org.apache.iceberg.types.Types;
 import org.apache.parquet.column.page.DictionaryPageReadStore;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.parquet.schema.Type;
 import org.apache.spark.sql.vectorized.ColumnVector;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
@@ -40,13 +37,12 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
  * {@link ColumnarBatch} returned is created by passing in the Arrow vectors populated via delegated read calls to
  * {@linkplain VectorizedArrowReader VectorReader(s)}.
  */
-public class ColumnarBatchReaders implements VectorizedReader<ColumnarBatch, ColumnarBatch> {
+public class ColumnarBatchReaders implements VectorizedReader<ColumnarBatch> {
   private final VectorizedArrowReader[] readers;
   private final int batchSize;
+  private boolean reuseContainers;
 
   public ColumnarBatchReaders(
-      List<Type> types,
-      Types.StructType icebergExpectedFields,
       List<VectorizedReader> readers,
       int bSize) {
     this.readers = (VectorizedArrowReader[]) Array.newInstance(
@@ -72,17 +68,23 @@ public class ColumnarBatchReaders implements VectorizedReader<ColumnarBatch, Col
   }
 
   @Override
-  public final ColumnarBatch read(ColumnarBatch ignore) {
+  public void reuseContainers(boolean reuse) {
+    for (VectorizedReader reader : readers) {
+      reader.reuseContainers(reuse);
+    }
+  }
+
+  @Override
+  public final ColumnarBatch read() {
     ColumnVector[] arrowColumnVectors = new ColumnVector[readers.length];
     int numRows = 0;
     for (int i = 0; i < readers.length; i += 1) {
-      NullabilityHolder nullabilityHolder = new NullabilityHolder(batchSize);
-      VectorHolder holder = readers[i].read(nullabilityHolder);
+      VectorHolder holder = readers[i].read();
       FieldVector vector = holder.getVector();
       if (vector == null) {
         arrowColumnVectors[i] = new NullValuesColumnVector(batchSize);
       } else {
-        arrowColumnVectors[i] = new IcebergArrowColumnVector(holder, nullabilityHolder);
+        arrowColumnVectors[i] = new IcebergArrowColumnVector(holder);
         numRows = vector.getValueCount();
       }
     }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
index 076b5f3..ae6e44c 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
@@ -70,9 +70,9 @@ public class IcebergArrowColumnVector extends ColumnVector {
   private final boolean isVectorDictEncoded;
   private ArrowColumnVector[] childColumns;
 
-  public IcebergArrowColumnVector(VectorHolder holder, NullabilityHolder nulls) {
+  public IcebergArrowColumnVector(VectorHolder holder) {
     super(ArrowUtils.instance().fromArrowField(holder.getVector().getField()));
-    this.nullabilityHolder = nulls;
+    this.nullabilityHolder = holder.getNullabilityHolder();
     this.columnDescriptor = holder.getDescriptor();
     this.dictionary = holder.getDictionary();
     this.isVectorDictEncoded = holder.isDictionaryEncoded();
diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
index 0bc96ae..8650ea6 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
@@ -65,17 +65,17 @@ public class VectorizedSparkParquetReaders {
     LOG.info("=> [VectorizedSparkParquetReaders] recordsPerBatch = {}", recordsPerBatch);
     return (ColumnarBatchReaders)
         TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
-            new VectorReaderBuilder(tableSchema, expectedSchema, fileSchema, recordsPerBatch));
+            new VectorizedReaderBuilder(tableSchema, expectedSchema, fileSchema, recordsPerBatch));
   }
 
-  private static class VectorReaderBuilder extends TypeWithSchemaVisitor<VectorizedReader> {
+  private static class VectorizedReaderBuilder extends TypeWithSchemaVisitor<VectorizedReader> {
     private final MessageType parquetSchema;
     private final Schema projectedIcebergSchema;
     private final Schema tableIcebergSchema;
     private final BufferAllocator rootAllocator;
     private final int recordsPerBatch;
 
-    VectorReaderBuilder(
+    VectorizedReaderBuilder(
         Schema tableSchema,
         Schema projectedIcebergSchema,
         MessageType parquetSchema,
@@ -134,8 +134,7 @@ public class VectorizedSparkParquetReaders {
           types.add(null);
         }
       }
-
-      return new ColumnarBatchReaders(types, expected, reorderedFields, recordsPerBatch);
+      return new ColumnarBatchReaders(reorderedFields, recordsPerBatch);
     }
 
     @Override
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 24a23dd..49bfd2a 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -145,8 +145,6 @@ class Reader implements DataSourceReader,
 
       this.numRecordsPerBatch = VectorizedArrowReader.DEFAULT_BATCH_SIZE;
     }
-    LOG.info("=> Set Config numRecordsPerBatch = {}", numRecordsPerBatch);
-
     if (snapshotId != null && asOfTimestamp != null) {
       throw new IllegalArgumentException(
           "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot");
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/VectorizedReading.java b/spark/src/main/java/org/apache/iceberg/spark/source/VectorizedReading.java
index 3e4d1b6..939acac 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/VectorizedReading.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/VectorizedReading.java
@@ -30,9 +30,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.Function;
 import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -44,11 +42,9 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.parquet.vectorized.VectorizedReader;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
 import org.apache.iceberg.types.Types;
-import org.apache.parquet.schema.MessageType;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.Attribute;
 import org.apache.spark.sql.catalyst.expressions.AttributeReference;
@@ -194,8 +190,6 @@ public class VectorizedReading {
     }
 
     private Iterator<ColumnarBatch> open(FileScanTask task) {
-      DataFile file = task.file();
-
       // schema or rows returned by readers
       Schema finalSchema = expectedSchema;
       PartitionSpec spec = task.spec();
@@ -205,47 +199,17 @@ public class VectorizedReading {
       StructType sparkType = SparkSchemaUtil.convert(finalSchema);
       Schema requiredSchema = SparkSchemaUtil.prune(tableSchema, sparkType, task.residual(), caseSensitive);
       boolean hasExtraFilterColumns = requiredSchema.columns().size() != finalSchema.columns().size();
-
-      Schema iterSchema;
       Iterator<ColumnarBatch> iter;
-
-      // if (hasJoinedPartitionColumns) {
-      // // schema used to read data files
-      // Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns);
-      // Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns);
-      // PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec);
-      // JoinedRow joined = new JoinedRow();
-      //
-      // InternalRow partition = convertToRow.apply(file.partition());
-      // joined.withRight(partition);
-      //
-      // // create joined rows and project from the joined schema to the final schema
-      // iterSchema = TypeUtil.join(readSchema, partitionSchema);
-      // iter = Iterators.transform(open(task, readSchema), joined::withLeft);
-      //
-      // } else if (hasExtraFilterColumns) {
       if (hasExtraFilterColumns) {
-        // add projection to the final schema
-        iterSchema = requiredSchema;
         iter = open(task, requiredSchema);
       } else {
-        // return the base iterator
-        iterSchema = finalSchema;
         iter = open(task, finalSchema);
       }
-
-      // TODO: remove the projection by reporting the iterator's schema back to Spark
-      // return Iterators.transform(iter,
-      //     APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke);
       return iter;
     }
 
     private Iterator<ColumnarBatch> open(FileScanTask task, Schema readSchema) {
       CloseableIterable<ColumnarBatch> iter;
-      // if (task.isDataTask()) {
-      //   iter = newDataIterable(task.asDataTask(), readSchema);
-      //
-      // } else {
       InputFile location = inputFiles.get(task.file().path().toString());
       Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask");
 
@@ -256,11 +220,8 @@ public class VectorizedReading {
         default:
           throw new UnsupportedOperationException(
               "Cannot read unknown format: " + task.file().format());
-          // }
       }
-
       this.currentCloseable = iter;
-
       return iter.iterator();
     }
 
@@ -294,13 +255,8 @@ public class VectorizedReading {
           .project(readSchema)
           .split(task.start(), task.length())
           .enableBatchedRead()
-          .createBatchedReaderFunc(new Function<MessageType, VectorizedReader>() {
-            @Override
-            public VectorizedReader apply(MessageType fileSchema) {
-              return VectorizedSparkParquetReaders.buildReader(tableSchema, readSchema,
-                  fileSchema, numRecordsPerBatch);
-            }
-          })
+          .createBatchedReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(tableSchema, readSchema,
+              fileSchema, numRecordsPerBatch))
           .filter(task.residual())
           .caseSensitive(caseSensitive)
           .recordsPerBatch(numRecordsPerBatch)