You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by GitBox <gi...@apache.org> on 2018/12/10 17:35:17 UTC

[GitHub] rdblue closed pull request #30: Update to Spark 2.4

rdblue closed pull request #30: Update to Spark 2.4
URL: https://github.com/apache/incubator-iceberg/pull/30
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index d41eb13..096156b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -75,7 +75,7 @@ subprojects {
     jacksonVersion = '2.6.7'
 
     scalaVersion = '2.11'
-    sparkVersion = '2.3.2'
+    sparkVersion = '2.4.0'
   }
 
   sourceCompatibility = '1.8'
@@ -160,8 +160,8 @@ project(':iceberg-orc') {
     compile("org.apache.orc:orc-core:$orcVersion:nohive") {
       exclude group: 'org.apache.hadoop', module: 'hadoop-common'
     }
-    
-    
+
+
     compileOnly("org.apache.hadoop:hadoop-client:$hadoopVersion") {
       exclude group: 'org.apache.avro', module: 'avro'
     }
@@ -190,7 +190,6 @@ project(':iceberg-spark') {
     compile project(':iceberg-api')
     compile project(':iceberg-common')
     compile project(':iceberg-core')
-    compile project(':iceberg-orc')
     compile project(':iceberg-parquet')
 
     compileOnly "org.apache.avro:avro:$avroVersion"
@@ -249,7 +248,6 @@ project(':iceberg-runtime') {
     shadow project(':iceberg-api')
     shadow project(':iceberg-common')
     shadow project(':iceberg-core')
-    shadow project(':iceberg-orc')
     shadow project(':iceberg-parquet')
     shadow project(':iceberg-spark')
     shadow project(':iceberg-pig')
@@ -302,7 +300,7 @@ project(':iceberg-presto-runtime') {
         shadow "org.apache.avro:avro:$avroVersion"
         shadow ("org.apache.hive:hive-standalone-metastore:$hiveVersion") {
             exclude group: 'org.apache.hadoop', module: 'hadoop-common'
-            exclude group: 'org.apache.orc', module: 'orc-core'
+//            exclude group: 'org.apache.orc', module: 'orc-core'
         }
     }
 
diff --git a/spark/src/main/java/com/netflix/iceberg/spark/data/SparkOrcReader.java b/spark/src/main/java/com/netflix/iceberg/spark/data/SparkOrcReader.java
deleted file mode 100644
index 6be855e..0000000
--- a/spark/src/main/java/com/netflix/iceberg/spark/data/SparkOrcReader.java
+++ /dev/null
@@ -1,870 +0,0 @@
-/*
- * Copyright 2018 Hortonworks
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.netflix.iceberg.spark.data;
-
-import com.netflix.iceberg.FileScanTask;
-import com.netflix.iceberg.Schema;
-import com.netflix.iceberg.io.InputFile;
-import com.netflix.iceberg.orc.ColumnIdMap;
-import com.netflix.iceberg.orc.ORC;
-import com.netflix.iceberg.orc.OrcIterator;
-import com.netflix.iceberg.orc.TypeConversion;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.storage.common.type.FastHiveDecimal;
-import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
-import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
-import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
-import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
-import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
-import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.storage.serde2.io.DateWritable;
-import org.apache.orc.storage.serde2.io.HiveDecimalWritable;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
-import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
-import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter;
-import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.MapData;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.unsafe.Platform;
-import org.apache.spark.unsafe.array.ByteArrayMethods;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.sql.Timestamp;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Converts the OrcInterator, which returns ORC's VectorizedRowBatch to a
- * set of Spark's UnsafeRows.
- *
- * It minimizes allocations by reusing most of the objects in the implementation.
- */
-public class SparkOrcReader implements Iterator<InternalRow>, Closeable {
-  private final static int INITIAL_SIZE = 128 * 1024;
-  private final OrcIterator reader;
-  private final TypeDescription orcSchema;
-  private final UnsafeRow row;
-  private final BufferHolder holder;
-  private final UnsafeRowWriter writer;
-  private int nextRow = 0;
-  private VectorizedRowBatch current = null;
-  private Converter[] converter;
-
-  public SparkOrcReader(InputFile location,
-                        FileScanTask task,
-                        Schema readSchema) {
-    ColumnIdMap columnIds = new ColumnIdMap();
-    orcSchema = TypeConversion.toOrc(readSchema, columnIds);
-    reader = ORC.read(location)
-        .split(task.start(), task.length())
-        .schema(readSchema)
-        .build();
-    int numFields = readSchema.columns().size();
-    row = new UnsafeRow(numFields);
-    holder = new BufferHolder(row, INITIAL_SIZE);
-    writer = new UnsafeRowWriter(holder, numFields);
-    converter = new Converter[numFields];
-    for(int c=0; c < numFields; ++c) {
-      converter[c] = buildConverter(holder, orcSchema.getChildren().get(c));
-    }
-  }
-
-  @Override
-  public boolean hasNext() {
-    return (current != null && nextRow < current.size) || reader.hasNext();
-  }
-
-  @Override
-  public UnsafeRow next() {
-    if (current == null || nextRow >= current.size) {
-      current = reader.next();
-      nextRow = 0;
-    }
-    // Reset the holder to start the buffer over again.
-    // BufferHolder.reset does the wrong thing...
-    holder.cursor = Platform.BYTE_ARRAY_OFFSET;
-    writer.reset();
-    for(int c=0; c < current.cols.length; ++c) {
-      converter[c].convert(writer, c, current.cols[c], nextRow);
-    }
-    nextRow++;
-    return row;
-  }
-
-  @Override
-  public void close() throws IOException {
-    reader.close();
-  }
-
-  private static void printRow(SpecializedGetters row, TypeDescription schema) {
-    List<TypeDescription> children = schema.getChildren();
-    System.out.print("{");
-    for(int c = 0; c < children.size(); ++c) {
-      System.out.print("\"" + schema.getFieldNames().get(c) + "\": ");
-      printRow(row, c, children.get(c));
-    }
-    System.out.print("}");
-  }
-
-  private static void printRow(SpecializedGetters row, int ord, TypeDescription schema) {
-    switch (schema.getCategory()) {
-      case BOOLEAN:
-        System.out.print(row.getBoolean(ord));
-        break;
-      case BYTE:
-        System.out.print(row.getByte(ord));
-        break;
-      case SHORT:
-        System.out.print(row.getShort(ord));
-        break;
-      case INT:
-        System.out.print(row.getInt(ord));
-        break;
-      case LONG:
-        System.out.print(row.getLong(ord));
-        break;
-      case FLOAT:
-        System.out.print(row.getFloat(ord));
-        break;
-      case DOUBLE:
-        System.out.print(row.getDouble(ord));
-        break;
-      case CHAR:
-      case VARCHAR:
-      case STRING:
-        System.out.print("\"" + row.getUTF8String(ord) + "\"");
-        break;
-      case BINARY: {
-        byte[] bin = row.getBinary(ord);
-        if (bin == null) {
-          System.out.print("null");
-        } else {
-          System.out.print("[");
-          for (int i = 0; i < bin.length; ++i) {
-            if (i != 0) {
-              System.out.print(", ");
-            }
-            int v = bin[i] & 0xff;
-            if (v < 16) {
-              System.out.print("0" + Integer.toHexString(v));
-            } else {
-              System.out.print(Integer.toHexString(v));
-            }
-          }
-          System.out.print("]");
-        }
-        break;
-      }
-      case DECIMAL:
-        System.out.print(row.getDecimal(ord, schema.getPrecision(), schema.getScale()));
-        break;
-      case DATE:
-        System.out.print("\"" + new DateWritable(row.getInt(ord)) + "\"");
-        break;
-      case TIMESTAMP:
-        System.out.print("\"" + new Timestamp(row.getLong(ord)) + "\"");
-        break;
-      case STRUCT:
-        printRow(row.getStruct(ord, schema.getChildren().size()), schema);
-        break;
-      case LIST: {
-        TypeDescription child = schema.getChildren().get(0);
-        System.out.print("[");
-        ArrayData list = row.getArray(ord);
-        for(int e=0; e < list.numElements(); ++e) {
-          if (e != 0) {
-            System.out.print(", ");
-          }
-          printRow(list, e, child);
-        }
-        System.out.print("]");
-        break;
-      }
-      case MAP: {
-        TypeDescription keyType = schema.getChildren().get(0);
-        TypeDescription valueType = schema.getChildren().get(1);
-        MapData map = row.getMap(ord);
-        ArrayData keys = map.keyArray();
-        ArrayData values = map.valueArray();
-        System.out.print("[");
-        for(int e=0; e < map.numElements(); ++e) {
-          if (e != 0) {
-            System.out.print(", ");
-          }
-          printRow(keys, e, keyType);
-          System.out.print(": ");
-          printRow(values, e, valueType);
-        }
-        System.out.print("]");
-        break;
-      }
-      default:
-        throw new IllegalArgumentException("Unhandled type " + schema);
-    }
-  }
-  static int getArrayElementSize(TypeDescription type) {
-    switch (type.getCategory()) {
-      case BOOLEAN:
-      case BYTE:
-        return 1;
-      case SHORT:
-        return 2;
-      case INT:
-      case FLOAT:
-        return 4;
-      default:
-        return 8;
-    }
-  }
-
-  /**
-   * The common interface for converting from a ORC ColumnVector to a Spark
-   * UnsafeRow. UnsafeRows need two different interfaces for writers and thus
-   * we have two methods the first is for structs (UnsafeRowWriter) and the
-   * second is for lists and maps (UnsafeArrayWriter). If Spark adds a common
-   * interface similar to SpecializedGetters we could that and a single set of
-   * methods.
-   */
-  interface Converter {
-    void convert(UnsafeRowWriter writer, int column, ColumnVector vector, int row);
-    void convert(UnsafeArrayWriter writer, int element, ColumnVector vector,
-                 int row);
-  }
-
-  private static class BooleanConverter implements Converter {
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, ((LongColumnVector) vector).vector[row] != 0);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, ((LongColumnVector) vector).vector[row] != 0);
-      }
-    }
-  }
-
-  private static class ByteConverter implements Converter {
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, (byte) ((LongColumnVector) vector).vector[row]);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, (byte) ((LongColumnVector) vector).vector[row]);
-      }
-    }
-  }
-
-  private static class ShortConverter implements Converter {
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, (short) ((LongColumnVector) vector).vector[row]);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, (short) ((LongColumnVector) vector).vector[row]);
-      }
-    }
-  }
-
-  private static class IntConverter implements Converter {
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, (int) ((LongColumnVector) vector).vector[row]);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, (int) ((LongColumnVector) vector).vector[row]);
-      }
-    }
-  }
-
-  private static class LongConverter implements Converter {
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, ((LongColumnVector) vector).vector[row]);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, ((LongColumnVector) vector).vector[row]);
-      }
-    }
-  }
-
-  private static class FloatConverter implements Converter {
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, (float) ((DoubleColumnVector) vector).vector[row]);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, (float) ((DoubleColumnVector) vector).vector[row]);
-      }
-    }
-  }
-
-  private static class DoubleConverter implements Converter {
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, ((DoubleColumnVector) vector).vector[row]);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, ((DoubleColumnVector) vector).vector[row]);
-      }
-    }
-  }
-
-  private static class TimestampConverter implements Converter {
-
-    private long convert(TimestampColumnVector vector, int row) {
-      // compute microseconds past 1970.
-      long micros = (vector.time[row]/1000) * 1_000_000 + vector.nanos[row] / 1000;
-      return micros;
-    }
-
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, convert((TimestampColumnVector) vector, row));
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, convert((TimestampColumnVector) vector, row));
-      }
-    }
-  }
-
-  /**
-   * UnsafeArrayWriter doesn't have a binary form that lets the user pass an
-   * offset and length, so I've added one here. It is the minor tweak of the
-   * UnsafeArrayWriter.write(int, byte[]) method.
-   * @param holder the BufferHolder where the bytes are being written
-   * @param writer the UnsafeArrayWriter
-   * @param ordinal the element that we are writing into
-   * @param input the input bytes
-   * @param offset the first byte from input
-   * @param length the number of bytes to write
-   */
-  static void write(BufferHolder holder, UnsafeArrayWriter writer, int ordinal,
-                    byte[] input, int offset, int length) {
-    final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(length);
-
-    // grow the global buffer before writing data.
-    holder.grow(roundedSize);
-
-    if ((length & 0x07) > 0) {
-      Platform.putLong(holder.buffer, holder.cursor + ((length >> 3) << 3), 0L);
-    }
-
-    // Write the bytes to the variable length portion.
-    Platform.copyMemory(input, Platform.BYTE_ARRAY_OFFSET + offset,
-        holder.buffer, holder.cursor, length);
-
-    writer.setOffsetAndSize(ordinal, holder.cursor, length);
-
-    // move the cursor forward.
-    holder.cursor += roundedSize;
-  }
-
-  private static class BinaryConverter implements Converter {
-    private final BufferHolder holder;
-
-    BinaryConverter(BufferHolder holder) {
-      this.holder = holder;
-    }
-
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        BytesColumnVector v = (BytesColumnVector) vector;
-        writer.write(column, v.vector[row], v.start[row], v.length[row]);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        BytesColumnVector v = (BytesColumnVector) vector;
-        write(holder, writer, element, v.vector[row], v.start[row],
-            v.length[row]);
-      }
-    }
-  }
-
-  /**
-   * This hack is to get the unscaled value (for precision <= 18) quickly.
-   * This can be replaced when we upgrade to storage-api 2.5.0.
-   */
-  static class DecimalHack extends FastHiveDecimal {
-    long unscaledLong(FastHiveDecimal value) {
-      fastSet(value);
-      return fastSignum * fast1 * 10_000_000_000_000_000L + fast0;
-    }
-  }
-
-  private static class Decimal18Converter implements Converter {
-    final DecimalHack hack = new DecimalHack();
-    final int precision;
-    final int scale;
-
-    Decimal18Converter(int precision, int scale) {
-      this.precision = precision;
-      this.scale = scale;
-    }
-
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        HiveDecimalWritable v = ((DecimalColumnVector) vector).vector[row];
-        writer.write(column,
-            new Decimal().set(hack.unscaledLong(v), precision, v.scale()),
-            precision, scale);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        HiveDecimalWritable v = ((DecimalColumnVector) vector).vector[row];
-        writer.write(element,
-            new Decimal().set(hack.unscaledLong(v), precision, v.scale()),
-            precision, scale);
-      }
-    }
-  }
-
-  private static class Decimal38Converter implements Converter {
-    final int precision;
-    final int scale;
-
-    Decimal38Converter(int precision, int scale) {
-      this.precision = precision;
-      this.scale = scale;
-    }
-
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        BigDecimal v = ((DecimalColumnVector) vector).vector[row]
-            .getHiveDecimal().bigDecimalValue();
-        writer.write(column,
-            new Decimal().set(new scala.math.BigDecimal(v), precision, scale),
-            precision, scale);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        BigDecimal v = ((DecimalColumnVector) vector).vector[row]
-            .getHiveDecimal().bigDecimalValue();
-        writer.write(element,
-            new Decimal().set(new scala.math.BigDecimal(v), precision, scale),
-            precision, scale);
-      }
-    }
-  }
-
-  private static class StructConverter implements Converter {
-    private final BufferHolder holder;
-    private final Converter[] children;
-    private final UnsafeRowWriter childWriter;
-
-    StructConverter(BufferHolder holder, TypeDescription schema) {
-      this.holder = holder;
-      children = new Converter[schema.getChildren().size()];
-      for(int c=0; c < children.length; ++c) {
-        children[c] = buildConverter(holder, schema.getChildren().get(c));
-      }
-      childWriter = new UnsafeRowWriter(holder, children.length);
-    }
-
-    int writeStruct(StructColumnVector vector, int row) {
-      int start = holder.cursor;
-      childWriter.reset();
-      for(int c=0; c < children.length; ++c) {
-        children[c].convert(childWriter, c, vector.fields[c], row);
-      }
-      return start;
-    }
-
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        int start = writeStruct((StructColumnVector) vector, row);
-        writer.setOffsetAndSize(column, start, holder.cursor - start);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        int start = writeStruct((StructColumnVector) vector, row);
-        writer.setOffsetAndSize(element, start, holder.cursor - start);
-      }
-    }
-  }
-
-  private static class ListConverter implements Converter {
-    private final BufferHolder holder;
-    private final Converter children;
-    private final UnsafeArrayWriter childWriter;
-    private final int elementSize;
-
-    ListConverter(BufferHolder holder, TypeDescription schema) {
-      this.holder = holder;
-      TypeDescription child = schema.getChildren().get(0);
-      children = buildConverter(holder, child);
-      childWriter = new UnsafeArrayWriter();
-      elementSize = getArrayElementSize(child);
-    }
-
-    int writeList(ListColumnVector v, int row) {
-      int offset = (int) v.offsets[row];
-      int length = (int) v.lengths[row];
-      int start = holder.cursor;
-      childWriter.initialize(holder, length, elementSize);
-      for(int c=0; c < length; ++c) {
-        children.convert(childWriter, c, v.child, offset + c);
-      }
-      return start;
-     }
-
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        int start = writeList((ListColumnVector) vector, row);
-        writer.setOffsetAndSize(column, start, holder.cursor - start);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        int start = writeList((ListColumnVector) vector, row);
-        writer.setOffsetAndSize(element, start, holder.cursor - start);
-      }
-    }
-  }
-
-  private static class MapConverter implements Converter {
-    private final BufferHolder holder;
-    private final Converter keyConvert;
-    private final Converter valueConvert;
-    private final UnsafeArrayWriter childWriter;
-    private final int keySize;
-    private final int valueSize;
-
-    MapConverter(BufferHolder holder, TypeDescription schema) {
-      this.holder = holder;
-      TypeDescription keyType = schema.getChildren().get(0);
-      TypeDescription valueType = schema.getChildren().get(1);
-      keyConvert = buildConverter(holder, keyType);
-      keySize = getArrayElementSize(keyType);
-      valueConvert = buildConverter(holder, valueType);
-      valueSize = getArrayElementSize(valueType);
-      childWriter = new UnsafeArrayWriter();
-    }
-
-    int writeMap(MapColumnVector v, int row) {
-      int offset = (int) v.offsets[row];
-      int length = (int) v.lengths[row];
-      int start = holder.cursor;
-      // save room for the key size
-      final int KEY_SIZE_BYTES = 8;
-      holder.grow(KEY_SIZE_BYTES);
-      holder.cursor += KEY_SIZE_BYTES;
-      // serialize the keys
-      childWriter.initialize(holder, length, keySize);
-      for(int c=0; c < length; ++c) {
-        keyConvert.convert(childWriter, c, v.keys, offset + c);
-      }
-      // store the serialized size of the keys
-      Platform.putLong(holder.buffer, start, holder.cursor - start - KEY_SIZE_BYTES);
-      // serialize the values
-      childWriter.initialize(holder, length, valueSize);
-      for(int c=0; c < length; ++c) {
-        valueConvert.convert(childWriter, c, v.values, offset + c);
-      }
-      return start;
-    }
-
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector vector,
-                        int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNullAt(column);
-      } else {
-        int start = writeMap((MapColumnVector) vector, row);
-        writer.setOffsetAndSize(column, start, holder.cursor - start);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      if (vector.isRepeating) {
-        row = 0;
-      }
-      if (!vector.noNulls && vector.isNull[row]) {
-        writer.setNull(element);
-      } else {
-        int start = writeMap((MapColumnVector) vector, row);
-        writer.setOffsetAndSize(element, start, holder.cursor - start);
-      }
-    }
-  }
-
-  static Converter buildConverter(BufferHolder holder, TypeDescription schema) {
-    switch (schema.getCategory()) {
-      case BOOLEAN:
-        return new BooleanConverter();
-      case BYTE:
-        return new ByteConverter();
-      case SHORT:
-        return new ShortConverter();
-      case DATE:
-      case INT:
-        return new IntConverter();
-      case LONG:
-        return new LongConverter();
-      case FLOAT:
-        return new FloatConverter();
-      case DOUBLE:
-        return new DoubleConverter();
-      case TIMESTAMP:
-        return new TimestampConverter();
-      case DECIMAL:
-        if (schema.getPrecision() <= Decimal.MAX_LONG_DIGITS()) {
-          return new Decimal18Converter(schema.getPrecision(), schema.getScale());
-        } else {
-          return new Decimal38Converter(schema.getPrecision(), schema.getScale());
-        }
-      case BINARY:
-      case STRING:
-      case CHAR:
-      case VARCHAR:
-        return new BinaryConverter(holder);
-      case STRUCT:
-        return new StructConverter(holder, schema);
-      case LIST:
-        return new ListConverter(holder, schema);
-      case MAP:
-        return new MapConverter(holder, schema);
-      default:
-        throw new IllegalArgumentException("Unhandled type " + schema);
-    }
-  }
-}
diff --git a/spark/src/main/java/com/netflix/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/com/netflix/iceberg/spark/data/SparkOrcWriter.java
deleted file mode 100644
index 175be10..0000000
--- a/spark/src/main/java/com/netflix/iceberg/spark/data/SparkOrcWriter.java
+++ /dev/null
@@ -1,434 +0,0 @@
-/*
- * Copyright 2018 Hortonworks
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.netflix.iceberg.spark.data;
-
-import com.netflix.iceberg.Metrics;
-import com.netflix.iceberg.io.FileAppender;
-import com.netflix.iceberg.orc.OrcFileAppender;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.storage.common.type.HiveDecimal;
-import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
-import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
-import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
-import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
-import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
-import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
-import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
-import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.DateTimeUtils;
-import org.apache.spark.sql.catalyst.util.MapData;
-import org.apache.spark.unsafe.types.UTF8String;
-
-import java.io.IOException;
-import java.sql.Timestamp;
-import java.util.List;
-
-/**
- * This class acts as an adaptor from an OrcFileAppender to a
- * FileAppender&lt;InternalRow&gt;.
- */
-public class SparkOrcWriter implements FileAppender<InternalRow> {
-  private final static int BATCH_SIZE = 1024;
-  private final VectorizedRowBatch batch;
-  private final OrcFileAppender writer;
-  private final Converter[] converters;
-
-  public SparkOrcWriter(OrcFileAppender writer) {
-    TypeDescription schema = writer.getSchema();
-    batch = schema.createRowBatch(BATCH_SIZE);
-    this.writer = writer;
-    converters = buildConverters(schema);
-  }
-
-  /**
-   * The interface for the conversion from Spark's SpecializedGetters to
-   * ORC's ColumnVectors.
-   */
-  interface Converter {
-    /**
-     * Take a value from the Spark data value and add it to the ORC output.
-     * @param rowId the row in the ColumnVector
-     * @param column either the column number or element number
-     * @param data either an InternalRow or ArrayData
-     * @param output the ColumnVector to put the value into
-     */
-    void addValue(int rowId, int column, SpecializedGetters data,
-                  ColumnVector output);
-  }
-
-  static class BooleanConverter implements Converter {
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getBoolean(column) ? 1 : 0;
-      }
-    }
-  }
-
-  static class ByteConverter implements Converter {
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getByte(column);
-      }
-    }
-  }
-
-  static class ShortConverter implements Converter {
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getShort(column);
-      }
-    }
-  }
-
-  static class IntConverter implements Converter {
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getInt(column);
-      }
-    }
-  }
-
-  static class LongConverter implements Converter {
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((LongColumnVector) output).vector[rowId] = data.getLong(column);
-      }
-    }
-  }
-
-  static class FloatConverter implements Converter {
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DoubleColumnVector) output).vector[rowId] = data.getFloat(column);
-      }
-    }
-  }
-
-  static class DoubleConverter implements Converter {
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DoubleColumnVector) output).vector[rowId] = data.getDouble(column);
-      }
-    }
-  }
-
-  static class StringConverter implements Converter {
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        byte[] value = data.getUTF8String(column).getBytes();
-        ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
-      }
-    }
-  }
-
-  static class BytesConverter implements Converter {
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        // getBinary always makes a copy, so we don't need to worry about it
-        // being changed behind our back.
-        byte[] value = data.getBinary(column);
-        ((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
-      }
-    }
-  }
-
-  static class TimestampConverter implements Converter {
-
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        TimestampColumnVector cv = (TimestampColumnVector) output;
-        long micros = data.getLong(column);
-        cv.time[rowId] = (micros / 1_000_000) * 1000;
-        int nanos = (int) (micros % 1_000_000) * 1000;
-        if (nanos < 0) {
-          nanos += 1_000_000_000;
-	  cv.time[rowId] -= 1000;
-        }
-        cv.nanos[rowId] = nanos;
-      }
-    }
-  }
-
-  static class Decimal18Converter implements Converter {
-    private final int precision;
-    private final int scale;
-
-    Decimal18Converter(TypeDescription schema) {
-      precision = schema.getPrecision();
-      scale = schema.getScale();
-    }
-
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DecimalColumnVector) output).vector[rowId].setFromLongAndScale(
-            data.getDecimal(column, precision, scale).toUnscaledLong(), scale);
-      }
-    }
-  }
-
-  static class Decimal38Converter implements Converter {
-    private final int precision;
-    private final int scale;
-
-    Decimal38Converter(TypeDescription schema) {
-      precision = schema.getPrecision();
-      scale = schema.getScale();
-    }
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ((DecimalColumnVector) output).vector[rowId].set(
-            HiveDecimal.create(data.getDecimal(column, precision, scale)
-                .toJavaBigDecimal()));
-      }
-    }
-  }
-
-  static class StructConverter implements Converter {
-    private final Converter[] children;
-
-    StructConverter(TypeDescription schema) {
-      children = new Converter[schema.getChildren().size()];
-      for(int c=0; c < children.length; ++c) {
-        children[c] = buildConverter(schema.getChildren().get(c));
-      }
-    }
-
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        InternalRow value = data.getStruct(column, children.length);
-        StructColumnVector cv = (StructColumnVector) output;
-        for(int c=0; c < children.length; ++c) {
-          children[c].addValue(rowId, c, value, cv.fields[c]);
-        }
-      }
-    }
-  }
-
-  static class ListConverter implements Converter {
-    private final Converter children;
-
-    ListConverter(TypeDescription schema) {
-      children = buildConverter(schema.getChildren().get(0));
-    }
-
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        ArrayData value = data.getArray(column);
-        ListColumnVector cv = (ListColumnVector) output;
-        // record the length and start of the list elements
-        cv.lengths[rowId] = value.numElements();
-        cv.offsets[rowId] = cv.childCount;
-        cv.childCount += cv.lengths[rowId];
-        // make sure the child is big enough
-        cv.child.ensureSize(cv.childCount, true);
-        // Add each element
-        for(int e=0; e < cv.lengths[rowId]; ++e) {
-          children.addValue((int) (e + cv.offsets[rowId]), e, value, cv.child);
-        }
-      }
-    }
-  }
-
-  static class MapConverter implements Converter {
-    private final Converter keyConverter;
-    private final Converter valueConverter;
-
-    MapConverter(TypeDescription schema) {
-      keyConverter = buildConverter(schema.getChildren().get(0));
-      valueConverter = buildConverter(schema.getChildren().get(1));
-    }
-
-    public void addValue(int rowId, int column, SpecializedGetters data,
-                         ColumnVector output) {
-      if (data.isNullAt(column)) {
-        output.noNulls = false;
-        output.isNull[rowId] = true;
-      } else {
-        output.isNull[rowId] = false;
-        MapData map = data.getMap(column);
-        ArrayData key = map.keyArray();
-        ArrayData value = map.valueArray();
-        MapColumnVector cv = (MapColumnVector) output;
-        // record the length and start of the list elements
-        cv.lengths[rowId] = value.numElements();
-        cv.offsets[rowId] = cv.childCount;
-        cv.childCount += cv.lengths[rowId];
-        // make sure the child is big enough
-        cv.keys.ensureSize(cv.childCount, true);
-        cv.values.ensureSize(cv.childCount, true);
-        // Add each element
-        for(int e=0; e < cv.lengths[rowId]; ++e) {
-          int pos = (int)(e + cv.offsets[rowId]);
-          keyConverter.addValue(pos, e, key, cv.keys);
-          valueConverter.addValue(pos, e, value, cv.values);
-        }
-      }
-    }
-  }
-
-  private static Converter buildConverter(TypeDescription schema) {
-    switch (schema.getCategory()) {
-      case BOOLEAN:
-        return new BooleanConverter();
-      case BYTE:
-        return new ByteConverter();
-      case SHORT:
-        return new ShortConverter();
-      case DATE:
-      case INT:
-        return new IntConverter();
-      case LONG:
-        return new LongConverter();
-      case FLOAT:
-        return new FloatConverter();
-      case DOUBLE:
-        return new DoubleConverter();
-      case BINARY:
-        return new BytesConverter();
-      case STRING:
-      case CHAR:
-      case VARCHAR:
-        return new StringConverter();
-      case DECIMAL:
-        return schema.getPrecision() <= 18
-            ? new Decimal18Converter(schema)
-            : new Decimal38Converter(schema);
-      case TIMESTAMP:
-        return new TimestampConverter();
-      case STRUCT:
-        return new StructConverter(schema);
-      case LIST:
-        return new ListConverter(schema);
-      case MAP:
-        return new MapConverter(schema);
-    }
-    throw new IllegalArgumentException("Unhandled type " + schema);
-  }
-
-  private static Converter[] buildConverters(TypeDescription schema) {
-    if (schema.getCategory() != TypeDescription.Category.STRUCT) {
-      throw new IllegalArgumentException("Top level must be a struct " + schema);
-    }
-    List<TypeDescription> children = schema.getChildren();
-    Converter[] result = new Converter[children.size()];
-    for(int c=0; c < children.size(); ++c) {
-      result[c] = buildConverter(children.get(c));
-    }
-    return result;
-  }
-
-  @Override
-  public void add(InternalRow datum) {
-    int row = batch.size++;
-    for(int c=0; c < converters.length; ++c) {
-      converters[c].addValue(row, c, datum, batch.cols[c]);
-    }
-    if (batch.size == BATCH_SIZE) {
-      writer.add(batch);
-      batch.reset();
-    }
-  }
-
-  @Override
-  public Metrics metrics() {
-    return writer.metrics();
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (batch.size > 0) {
-      writer.add(batch);
-      batch.reset();
-    }
-    writer.close();
-  }
-}
diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java
index 78fbb80..4a008ee 100644
--- a/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java
@@ -20,7 +20,6 @@
 package com.netflix.iceberg.spark.source;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import com.netflix.iceberg.CombinedScanTask;
 import com.netflix.iceberg.DataFile;
 import com.netflix.iceberg.FileScanTask;
@@ -39,10 +38,9 @@
 import com.netflix.iceberg.io.CloseableIterable;
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.parquet.Parquet;
-import com.netflix.iceberg.spark.SparkExpressions;
+import com.netflix.iceberg.spark.SparkFilters;
 import com.netflix.iceberg.spark.SparkSchemaUtil;
 import com.netflix.iceberg.spark.data.SparkAvroReader;
-import com.netflix.iceberg.spark.data.SparkOrcReader;
 import com.netflix.iceberg.spark.data.SparkParquetReaders;
 import com.netflix.iceberg.types.TypeUtil;
 import com.netflix.iceberg.types.Types;
@@ -53,15 +51,14 @@
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.catalyst.expressions.JoinedRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
-import org.apache.spark.sql.sources.v2.reader.DataReader;
+import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 import org.apache.spark.sql.sources.v2.reader.Statistics;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownCatalystFilters;
+import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
 import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
 import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics;
-import org.apache.spark.sql.sources.v2.reader.SupportsScanUnsafeRow;
 import org.apache.spark.sql.types.BinaryType;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
@@ -86,17 +83,16 @@
 import static scala.collection.JavaConverters.asScalaBufferConverter;
 import static scala.collection.JavaConverters.seqAsJavaListConverter;
 
-class Reader implements DataSourceReader, SupportsScanUnsafeRow, SupportsPushDownCatalystFilters,
-    SupportsPushDownRequiredColumns, SupportsReportStatistics {
+class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushDownRequiredColumns,
+    SupportsReportStatistics {
 
-  private static final org.apache.spark.sql.catalyst.expressions.Expression[] NO_EXPRS =
-      new org.apache.spark.sql.catalyst.expressions.Expression[0];
+  private static final Filter[] NO_FILTERS = new Filter[0];
 
   private final Table table;
   private final SerializableConfiguration conf;
   private StructType requestedSchema = null;
   private List<Expression> filterExpressions = null;
-  private org.apache.spark.sql.catalyst.expressions.Expression[] pushedExprs = NO_EXPRS;
+  private Filter[] pushedFilters = NO_FILTERS;
 
   // lazy variables
   private Schema schema = null;
@@ -133,11 +129,11 @@ public StructType readSchema() {
   }
 
   @Override
-  public List<DataReaderFactory<UnsafeRow>> createUnsafeRowReaderFactories() {
+  public List<InputPartition<InternalRow>> planInputPartitions() {
     String tableSchemaString = SchemaParser.toJson(table.schema());
     String expectedSchemaString = SchemaParser.toJson(lazySchema());
 
-    List<DataReaderFactory<UnsafeRow>> readTasks = Lists.newArrayList();
+    List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
     for (CombinedScanTask task : tasks()) {
       readTasks.add(new ReadTask(task, tableSchemaString, expectedSchemaString, conf));
     }
@@ -146,16 +142,14 @@ public StructType readSchema() {
   }
 
   @Override
-  public org.apache.spark.sql.catalyst.expressions.Expression[] pushCatalystFilters(
-      org.apache.spark.sql.catalyst.expressions.Expression[] filters) {
+  public Filter[] pushFilters(Filter[] filters) {
     this.tasks = null; // invalidate cached tasks, if present
 
     List<Expression> expressions = Lists.newArrayListWithExpectedSize(filters.length);
-    List<org.apache.spark.sql.catalyst.expressions.Expression> pushed =
-        Lists.newArrayListWithExpectedSize(filters.length);
+    List<Filter> pushed = Lists.newArrayListWithExpectedSize(filters.length);
 
-    for (org.apache.spark.sql.catalyst.expressions.Expression filter : filters) {
-      Expression expr = SparkExpressions.convert(filter);
+    for (Filter filter : filters) {
+      Expression expr = SparkFilters.convert(filter);
       if (expr != null) {
         expressions.add(expr);
         pushed.add(filter);
@@ -163,7 +157,7 @@ public StructType readSchema() {
     }
 
     this.filterExpressions = expressions;
-    this.pushedExprs = pushed.toArray(new org.apache.spark.sql.catalyst.expressions.Expression[0]);
+    this.pushedFilters = pushed.toArray(new Filter[0]);
 
     // invalidate the schema that will be projected
     this.schema = null;
@@ -175,8 +169,8 @@ public StructType readSchema() {
   }
 
   @Override
-  public org.apache.spark.sql.catalyst.expressions.Expression[] pushedCatalystFilters() {
-    return pushedExprs;
+  public Filter[] pushedFilters() {
+    return pushedFilters;
   }
 
   @Override
@@ -189,7 +183,7 @@ public void pruneColumns(StructType requestedSchema) {
   }
 
   @Override
-  public Statistics getStatistics() {
+  public Statistics estimateStatistics() {
     long sizeInBytes = 0L;
     long numRows = 0L;
 
@@ -230,7 +224,7 @@ public String toString() {
         table, lazySchema().asStruct(), filterExpressions);
   }
 
-  private static class ReadTask implements DataReaderFactory<UnsafeRow>, Serializable {
+  private static class ReadTask implements InputPartition<InternalRow>, Serializable {
     private final CombinedScanTask task;
     private final String tableSchemaString;
     private final String expectedSchemaString;
@@ -248,7 +242,7 @@ private ReadTask(CombinedScanTask task, String tableSchemaString, String expecte
     }
 
     @Override
-    public DataReader<UnsafeRow> createDataReader() {
+    public InputPartitionReader<InternalRow> createPartitionReader() {
       return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), conf.value());
     }
 
@@ -267,7 +261,7 @@ private Schema lazyExpectedSchema() {
     }
   }
 
-  private static class TaskDataReader implements DataReader<UnsafeRow> {
+  private static class TaskDataReader implements InputPartitionReader<InternalRow> {
     // for some reason, the apply method can't be called from Java without reflection
     private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply")
         .impl(UnsafeProjection.class, InternalRow.class)
@@ -278,9 +272,9 @@ private Schema lazyExpectedSchema() {
     private final Schema expectedSchema;
     private final Configuration conf;
 
-    private Iterator<UnsafeRow> currentIterator = null;
+    private Iterator<InternalRow> currentIterator = null;
     private Closeable currentCloseable = null;
-    private UnsafeRow current = null;
+    private InternalRow current = null;
 
     public TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, Configuration conf) {
       this.tasks = task.files().iterator();
@@ -309,7 +303,7 @@ public boolean next() throws IOException {
     }
 
     @Override
-    public UnsafeRow get() {
+    public InternalRow get() {
       return current;
     }
 
@@ -324,13 +318,13 @@ public void close() throws IOException {
       }
     }
 
-    private Iterator<UnsafeRow> open(FileScanTask task) {
+    private Iterator<InternalRow> open(FileScanTask task) {
       DataFile file = task.file();
 
       // schema or rows returned by readers
       Schema finalSchema = expectedSchema;
       PartitionSpec spec = task.spec();
-      Set<Integer> idColumns = identitySourceIds(spec);
+      Set<Integer> idColumns = spec.identitySourceIds();
 
       // schema needed for the projection and filtering
       Schema requiredSchema = prune(tableSchema, convert(finalSchema), task.residual());
@@ -365,6 +359,7 @@ public void close() throws IOException {
         iter = open(task, finalSchema, conf);
       }
 
+      // TODO: remove the projection by reporting the iterator's schema back to Spark
       return transform(iter,
           APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke);
     }
@@ -391,29 +386,11 @@ private static UnsafeProjection projection(Schema finalSchema, Schema readSchema
           asScalaBufferConverter(attrs).asScala().toSeq());
     }
 
-    private static Set<Integer> identitySourceIds(PartitionSpec spec) {
-      Set<Integer> sourceIds = Sets.newHashSet();
-      List<PartitionField> fields = spec.fields();
-      for (int i = 0; i < fields.size(); i += 1) {
-        PartitionField field = fields.get(i);
-        if ("identity".equals(field.transform().toString())) {
-          sourceIds.add(field.sourceId());
-        }
-      }
-
-      return sourceIds;
-    }
-
     private Iterator<InternalRow> open(FileScanTask task, Schema readSchema,
                                        Configuration conf) {
       InputFile location = HadoopInputFile.fromLocation(task.file().path(), conf);
       CloseableIterable<InternalRow> iter;
       switch (task.file().format()) {
-        case ORC:
-          SparkOrcReader reader = new SparkOrcReader(location, task, readSchema);
-          this.currentCloseable = reader;
-          return reader;
-
         case PARQUET:
           iter = newParquetIterable(location, task, readSchema);
           break;
diff --git a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java
index e729474..bed2cf6 100644
--- a/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/com/netflix/iceberg/spark/source/Writer.java
@@ -19,7 +19,6 @@
 
 package com.netflix.iceberg.spark.source;
 
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
@@ -40,10 +39,8 @@
 import com.netflix.iceberg.io.FileAppender;
 import com.netflix.iceberg.io.InputFile;
 import com.netflix.iceberg.io.OutputFile;
-import com.netflix.iceberg.orc.ORC;
 import com.netflix.iceberg.parquet.Parquet;
 import com.netflix.iceberg.spark.data.SparkAvroWriter;
-import com.netflix.iceberg.spark.data.SparkOrcWriter;
 import com.netflix.iceberg.transforms.Transform;
 import com.netflix.iceberg.transforms.Transforms;
 import com.netflix.iceberg.types.Types.StringType;
@@ -56,7 +53,6 @@
 import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
 import org.apache.spark.sql.sources.v2.writer.DataWriter;
 import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
-import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow;
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
 import org.apache.spark.util.SerializableConfiguration;
 import org.slf4j.Logger;
@@ -86,7 +82,7 @@
 import static com.netflix.iceberg.spark.SparkSchemaUtil.convert;
 
 // TODO: parameterize DataSourceWriter with subclass of WriterCommitMessage
-class Writer implements DataSourceWriter, SupportsWriteInternalRow {
+class Writer implements DataSourceWriter {
   private static final Transform<String, Integer> HASH_FUNC = Transforms
       .bucket(StringType.get(), Integer.MAX_VALUE);
   private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
@@ -102,7 +98,7 @@
   }
 
   @Override
-  public DataWriterFactory<InternalRow> createInternalRowWriterFactory() {
+  public DataWriterFactory<InternalRow> createWriterFactory() {
     return new WriterFactory(table.spec(), format, dataLocation(), table.properties(), conf);
   }
 
@@ -218,9 +214,8 @@ public String toString() {
     }
 
     @Override
-    public DataWriter<InternalRow> createDataWriter(int partitionId, int attemptNumber) {
-      String filename = format.addExtension(String.format("%05d-%d-%s",
-          partitionId, attemptNumber, uuid));
+    public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
+      String filename = format.addExtension(String.format("%05d-%d-%s", partitionId, taskId, uuid));
       AppenderFactory<InternalRow> factory = new SparkAppenderFactory();
       if (spec.fields().isEmpty()) {
         return new UnpartitionedWriter(lazyDataPath(), filename, format, conf.value(), factory);
@@ -301,13 +296,6 @@ private Path lazyDataPath() {
                   .schema(schema)
                   .build();
 
-            case ORC: {
-              @SuppressWarnings("unchecked")
-              SparkOrcWriter writer = new SparkOrcWriter(ORC.write(file)
-                  .schema(schema)
-                  .build());
-              return writer;
-            }
             default:
               throw new UnsupportedOperationException("Cannot write unknown format: " + format);
           }
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/data/CodegenExamples.java b/spark/src/test/java/com/netflix/iceberg/spark/data/CodegenExamples.java
deleted file mode 100644
index 5ddef24..0000000
--- a/spark/src/test/java/com/netflix/iceberg/spark/data/CodegenExamples.java
+++ /dev/null
@@ -1,707 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.netflix.iceberg.spark.data;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData;
-import org.apache.spark.sql.catalyst.expressions.UnsafeMapData;
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.MapData;
-import org.apache.spark.unsafe.Platform;
-import org.apache.spark.unsafe.types.UTF8String;
-
-public class CodegenExamples {
-
-
-  class Example1 extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
-
-    private Object[] references;
-    private UnsafeRow result;
-    private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;
-
-    public Example1(Object[] references) {
-      this.references = references;
-      result = new UnsafeRow(2);
-      this.holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32);
-      this.rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 2);
-
-    }
-
-    public void initialize(int partitionIndex) {
-
-    }
-
-    public UnsafeRow apply(InternalRow i) {
-      holder.reset();
-
-      rowWriter.zeroOutNullBytes();
-
-
-      boolean isNull = i.isNullAt(0);
-      long value = isNull ? -1L : (i.getLong(0));
-      if (isNull) {
-        rowWriter.setNullAt(0);
-      } else {
-        rowWriter.write(0, value);
-      }
-
-
-      boolean isNull1 = i.isNullAt(1);
-      UTF8String value1 = isNull1 ? null : (i.getUTF8String(1));
-      if (isNull1) {
-        rowWriter.setNullAt(1);
-      } else {
-        rowWriter.write(1, value1);
-      }
-      result.setTotalSize(holder.totalSize());
-      return result;
-    }
-  }
-
-  class Example2 extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
-
-    private Object[] references;
-    private UnsafeRow result;
-    private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter1;
-
-    public Example2(Object[] references) {
-      this.references = references;
-      result = new UnsafeRow(1);
-      this.holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32);
-      this.rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 1);
-      this.rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 1);
-
-    }
-
-    public void initialize(int partitionIndex) {
-
-    }
-
-    public UnsafeRow apply(InternalRow i) {
-      holder.reset();
-
-      rowWriter.zeroOutNullBytes();
-
-
-      boolean isNull = i.isNullAt(0);
-      InternalRow value = isNull ? null : (i.getStruct(0, 1));
-      if (isNull) {
-        rowWriter.setNullAt(0);
-      } else {
-        // Remember the current cursor so that we can calculate how many bytes are
-        // written later.
-        final int tmpCursor = holder.cursor;
-
-        if (value instanceof UnsafeRow) {
-
-          final int sizeInBytes = ((UnsafeRow) value).getSizeInBytes();
-          // grow the global buffer before writing data.
-          holder.grow(sizeInBytes);
-          ((UnsafeRow) value).writeToMemory(holder.buffer, holder.cursor);
-          holder.cursor += sizeInBytes;
-
-        } else {
-          rowWriter1.reset();
-
-
-          boolean isNull1 = value.isNullAt(0);
-          float value1 = isNull1 ? -1.0f : value.getFloat(0);
-
-          if (isNull1) {
-            rowWriter1.setNullAt(0);
-          } else {
-            rowWriter1.write(0, value1);
-          }
-        }
-
-        rowWriter.setOffsetAndSize(0, tmpCursor, holder.cursor - tmpCursor);
-      }
-      result.setTotalSize(holder.totalSize());
-      return result;
-    }
-  }
-
-  class Example3 extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
-
-    private Object[] references;
-    private UnsafeRow result;
-    private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter1;
-
-    public Example3(Object[] references) {
-      this.references = references;
-      result = new UnsafeRow(1);
-      this.holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32);
-      this.rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 1);
-      this.rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 2);
-
-    }
-
-    public void initialize(int partitionIndex) {
-
-    }
-
-    public UnsafeRow apply(InternalRow i) {
-      holder.reset();
-
-      rowWriter.zeroOutNullBytes();
-
-
-      boolean isNull = i.isNullAt(0);
-      InternalRow value = isNull ? null : (i.getStruct(0, 2));
-      if (isNull) {
-        rowWriter.setNullAt(0);
-      } else {
-        // Remember the current cursor so that we can calculate how many bytes are
-        // written later.
-        final int tmpCursor = holder.cursor;
-
-        if (value instanceof UnsafeRow) {
-
-          final int sizeInBytes = ((UnsafeRow) value).getSizeInBytes();
-          // grow the global buffer before writing data.
-          holder.grow(sizeInBytes);
-          ((UnsafeRow) value).writeToMemory(holder.buffer, holder.cursor);
-          holder.cursor += sizeInBytes;
-
-        } else {
-          rowWriter1.reset();
-
-
-          boolean isNull1 = value.isNullAt(0);
-          float value1 = isNull1 ? -1.0f : value.getFloat(0);
-
-          if (isNull1) {
-            rowWriter1.setNullAt(0);
-          } else {
-            rowWriter1.write(0, value1);
-          }
-
-
-          boolean isNull2 = value.isNullAt(1);
-          float value2 = isNull2 ? -1.0f : value.getFloat(1);
-
-          if (isNull2) {
-            rowWriter1.setNullAt(1);
-          } else {
-            rowWriter1.write(1, value2);
-          }
-        }
-
-        rowWriter.setOffsetAndSize(0, tmpCursor, holder.cursor - tmpCursor);
-      }
-      result.setTotalSize(holder.totalSize());
-      return result;
-    }
-  }
-
-
-  class Example4 extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
-
-    private Object[] references;
-    private UnsafeRow result;
-    private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter arrayWriter;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter arrayWriter1;
-
-    public Example4(Object[] references) {
-      this.references = references;
-      result = new UnsafeRow(1);
-      this.holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32);
-      this.rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 1);
-      this.arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
-      this.arrayWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
-
-    }
-
-    public void initialize(int partitionIndex) {
-
-    }
-
-    public UnsafeRow apply(InternalRow i) {
-      holder.reset();
-
-      rowWriter.zeroOutNullBytes();
-
-
-      boolean isNull = i.isNullAt(0);
-      MapData value = isNull ? null : (i.getMap(0));
-      if (isNull) {
-        rowWriter.setNullAt(0);
-      } else {
-        // Remember the current cursor so that we can calculate how many bytes are
-        // written later.
-        final int tmpCursor = holder.cursor;
-
-        if (value instanceof UnsafeMapData) {
-
-          final int sizeInBytes = ((UnsafeMapData) value).getSizeInBytes();
-          // grow the global buffer before writing data.
-          holder.grow(sizeInBytes);
-          ((UnsafeMapData) value).writeToMemory(holder.buffer, holder.cursor);
-          holder.cursor += sizeInBytes;
-
-        } else {
-          final ArrayData keys = value.keyArray();
-          final ArrayData values = value.valueArray();
-
-          // preserve 8 bytes to write the key array numBytes later.
-          holder.grow(8);
-          holder.cursor += 8;
-
-          // Remember the current cursor so that we can write numBytes of key array later.
-          final int tmpCursor1 = holder.cursor;
-
-
-          if (keys instanceof UnsafeArrayData) {
-
-            final int sizeInBytes1 = ((UnsafeArrayData) keys).getSizeInBytes();
-            // grow the global buffer before writing data.
-            holder.grow(sizeInBytes1);
-            ((UnsafeArrayData) keys).writeToMemory(holder.buffer, holder.cursor);
-            holder.cursor += sizeInBytes1;
-
-          } else {
-            final int numElements = keys.numElements();
-            arrayWriter.initialize(holder, numElements, 8);
-
-            for (int index = 0; index < numElements; index++) {
-              if (keys.isNullAt(index)) {
-                arrayWriter.setNull(index);
-              } else {
-                final UTF8String element = keys.getUTF8String(index);
-                arrayWriter.write(index, element);
-              }
-            }
-          }
-
-          // Write the numBytes of key array into the first 8 bytes.
-          Platform.putLong(holder.buffer, tmpCursor1 - 8, holder.cursor - tmpCursor1);
-
-
-          if (values instanceof UnsafeArrayData) {
-
-            final int sizeInBytes2 = ((UnsafeArrayData) values).getSizeInBytes();
-            // grow the global buffer before writing data.
-            holder.grow(sizeInBytes2);
-            ((UnsafeArrayData) values).writeToMemory(holder.buffer, holder.cursor);
-            holder.cursor += sizeInBytes2;
-
-          } else {
-            final int numElements1 = values.numElements();
-            arrayWriter1.initialize(holder, numElements1, 8);
-
-            for (int index1 = 0; index1 < numElements1; index1++) {
-              if (values.isNullAt(index1)) {
-                arrayWriter1.setNull(index1);
-              } else {
-                final UTF8String element1 = values.getUTF8String(index1);
-                arrayWriter1.write(index1, element1);
-              }
-            }
-          }
-
-        }
-
-        rowWriter.setOffsetAndSize(0, tmpCursor, holder.cursor - tmpCursor);
-      }
-      result.setTotalSize(holder.totalSize());
-      return result;
-    }
-  }
-
-
-  class Example5 extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
-
-    private Object[] references;
-    private UnsafeRow result;
-    private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter arrayWriter;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter1;
-
-    public Example5(Object[] references) {
-      this.references = references;
-      result = new UnsafeRow(1);
-      this.holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32);
-      this.rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 1);
-      this.arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
-      this.rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 2);
-
-    }
-
-    public void initialize(int partitionIndex) {
-
-    }
-
-    public UnsafeRow apply(InternalRow i) {
-      holder.reset();
-
-      rowWriter.zeroOutNullBytes();
-
-
-      boolean isNull = i.isNullAt(0);
-      ArrayData value = isNull ? null : (i.getArray(0));
-      if (isNull) {
-        rowWriter.setNullAt(0);
-      } else {
-        // Remember the current cursor so that we can calculate how many bytes are
-        // written later.
-        final int tmpCursor = holder.cursor;
-
-        if (value instanceof UnsafeArrayData) {
-
-          final int sizeInBytes1 = ((UnsafeArrayData) value).getSizeInBytes();
-          // grow the global buffer before writing data.
-          holder.grow(sizeInBytes1);
-          ((UnsafeArrayData) value).writeToMemory(holder.buffer, holder.cursor);
-          holder.cursor += sizeInBytes1;
-
-        } else {
-          final int numElements = value.numElements();
-          arrayWriter.initialize(holder, numElements, 8);
-
-          for (int index = 0; index < numElements; index++) {
-            if (value.isNullAt(index)) {
-              arrayWriter.setNull(index);
-            } else {
-              final InternalRow element = value.getStruct(index, 2);
-
-              final int tmpCursor1 = holder.cursor;
-
-              if (element instanceof UnsafeRow) {
-
-                final int sizeInBytes = ((UnsafeRow) element).getSizeInBytes();
-                // grow the global buffer before writing data.
-                holder.grow(sizeInBytes);
-                ((UnsafeRow) element).writeToMemory(holder.buffer, holder.cursor);
-                holder.cursor += sizeInBytes;
-
-              } else {
-                rowWriter1.reset();
-
-
-                boolean isNull1 = element.isNullAt(0);
-                int value1 = isNull1 ? -1 : element.getInt(0);
-
-                if (isNull1) {
-                  rowWriter1.setNullAt(0);
-                } else {
-                  rowWriter1.write(0, value1);
-                }
-
-
-                boolean isNull2 = element.isNullAt(1);
-                int value2 = isNull2 ? -1 : element.getInt(1);
-
-                if (isNull2) {
-                  rowWriter1.setNullAt(1);
-                } else {
-                  rowWriter1.write(1, value2);
-                }
-              }
-
-              arrayWriter.setOffsetAndSize(index, tmpCursor1, holder.cursor - tmpCursor1);
-
-            }
-          }
-        }
-
-        rowWriter.setOffsetAndSize(0, tmpCursor, holder.cursor - tmpCursor);
-      }
-      result.setTotalSize(holder.totalSize());
-      return result;
-    }
-  }
-
-  class Example6 extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
-
-    private Object[] references;
-    private UnsafeRow result;
-    private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter arrayWriter;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter arrayWriter1;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter1;
-
-    public Example6(Object[] references) {
-      this.references = references;
-      result = new UnsafeRow(1);
-      this.holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32);
-      this.rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 1);
-      this.arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
-      this.arrayWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
-      this.rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 2);
-
-    }
-
-    public void initialize(int partitionIndex) {
-
-    }
-
-    public UnsafeRow apply(InternalRow i) {
-      holder.reset();
-
-      rowWriter.zeroOutNullBytes();
-
-
-      boolean isNull = i.isNullAt(0);
-      MapData value = isNull ? null : (i.getMap(0));
-      if (isNull) {
-        rowWriter.setNullAt(0);
-      } else {
-        // Remember the current cursor so that we can calculate how many bytes are
-        // written later.
-        final int tmpCursor = holder.cursor;
-
-        if (value instanceof UnsafeMapData) {
-
-          final int sizeInBytes = ((UnsafeMapData) value).getSizeInBytes();
-          // grow the global buffer before writing data.
-          holder.grow(sizeInBytes);
-          ((UnsafeMapData) value).writeToMemory(holder.buffer, holder.cursor);
-          holder.cursor += sizeInBytes;
-
-        } else {
-          final ArrayData keys = value.keyArray();
-          final ArrayData values = value.valueArray();
-
-          // preserve 8 bytes to write the key array numBytes later.
-          holder.grow(8);
-          holder.cursor += 8;
-
-          // Remember the current cursor so that we can write numBytes of key array later.
-          final int tmpCursor1 = holder.cursor;
-
-
-          if (keys instanceof UnsafeArrayData) {
-
-            final int sizeInBytes1 = ((UnsafeArrayData) keys).getSizeInBytes();
-            // grow the global buffer before writing data.
-            holder.grow(sizeInBytes1);
-            ((UnsafeArrayData) keys).writeToMemory(holder.buffer, holder.cursor);
-            holder.cursor += sizeInBytes1;
-
-          } else {
-            final int numElements = keys.numElements();
-            arrayWriter.initialize(holder, numElements, 8);
-
-            for (int index = 0; index < numElements; index++) {
-              if (keys.isNullAt(index)) {
-                arrayWriter.setNull(index);
-              } else {
-                final UTF8String element = keys.getUTF8String(index);
-                arrayWriter.write(index, element);
-              }
-            }
-          }
-
-          // Write the numBytes of key array into the first 8 bytes.
-          Platform.putLong(holder.buffer, tmpCursor1 - 8, holder.cursor - tmpCursor1);
-
-
-          if (values instanceof UnsafeArrayData) {
-
-            final int sizeInBytes3 = ((UnsafeArrayData) values).getSizeInBytes();
-            // grow the global buffer before writing data.
-            holder.grow(sizeInBytes3);
-            ((UnsafeArrayData) values).writeToMemory(holder.buffer, holder.cursor);
-            holder.cursor += sizeInBytes3;
-
-          } else {
-            final int numElements1 = values.numElements();
-            arrayWriter1.initialize(holder, numElements1, 8);
-
-            for (int index1 = 0; index1 < numElements1; index1++) {
-              if (values.isNullAt(index1)) {
-                arrayWriter1.setNull(index1);
-              } else {
-                final InternalRow element1 = values.getStruct(index1, 2);
-
-                final int tmpCursor3 = holder.cursor;
-
-                if (element1 instanceof UnsafeRow) {
-
-                  final int sizeInBytes2 = ((UnsafeRow) element1).getSizeInBytes();
-                  // grow the global buffer before writing data.
-                  holder.grow(sizeInBytes2);
-                  ((UnsafeRow) element1).writeToMemory(holder.buffer, holder.cursor);
-                  holder.cursor += sizeInBytes2;
-
-                } else {
-                  rowWriter1.reset();
-
-
-                  boolean isNull1 = element1.isNullAt(0);
-                  float value1 = isNull1 ? -1.0f : element1.getFloat(0);
-
-                  if (isNull1) {
-                    rowWriter1.setNullAt(0);
-                  } else {
-                    rowWriter1.write(0, value1);
-                  }
-
-
-                  boolean isNull2 = element1.isNullAt(1);
-                  float value2 = isNull2 ? -1.0f : element1.getFloat(1);
-
-                  if (isNull2) {
-                    rowWriter1.setNullAt(1);
-                  } else {
-                    rowWriter1.write(1, value2);
-                  }
-                }
-
-                arrayWriter1.setOffsetAndSize(index1, tmpCursor3, holder.cursor - tmpCursor3);
-
-              }
-            }
-          }
-
-        }
-
-        rowWriter.setOffsetAndSize(0, tmpCursor, holder.cursor - tmpCursor);
-      }
-      result.setTotalSize(holder.totalSize());
-      return result;
-    }
-  }
-
-  class Example7 extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
-
-    private Object[] references;
-    private UnsafeRow result;
-    private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter arrayWriter;
-    private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter arrayWriter1;
-
-    public Example7(Object[] references) {
-      this.references = references;
-      result = new UnsafeRow(1);
-      this.holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32);
-      this.rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 1);
-      this.arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
-      this.arrayWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
-
-    }
-
-    public void initialize(int partitionIndex) {
-
-    }
-
-    public UnsafeRow apply(InternalRow i) {
-      holder.reset();
-
-      rowWriter.zeroOutNullBytes();
-
-
-      boolean isNull = i.isNullAt(0);
-      MapData value = isNull ? null : (i.getMap(0));
-      if (isNull) {
-        rowWriter.setNullAt(0);
-      } else {
-        // Remember the current cursor so that we can calculate how many bytes are
-        // written later.
-        final int tmpCursor = holder.cursor;
-
-        if (value instanceof UnsafeMapData) {
-
-          final int sizeInBytes = ((UnsafeMapData) value).getSizeInBytes();
-          // grow the global buffer before writing data.
-          holder.grow(sizeInBytes);
-          ((UnsafeMapData) value).writeToMemory(holder.buffer, holder.cursor);
-          holder.cursor += sizeInBytes;
-
-        } else {
-          final ArrayData keys = value.keyArray();
-          final ArrayData values = value.valueArray();
-
-          // preserve 8 bytes to write the key array numBytes later.
-          holder.grow(8);
-          holder.cursor += 8;
-
-          // Remember the current cursor so that we can write numBytes of key array later.
-          final int tmpCursor1 = holder.cursor;
-
-
-          if (keys instanceof UnsafeArrayData) {
-
-            final int sizeInBytes1 = ((UnsafeArrayData) keys).getSizeInBytes();
-            // grow the global buffer before writing data.
-            holder.grow(sizeInBytes1);
-            ((UnsafeArrayData) keys).writeToMemory(holder.buffer, holder.cursor);
-            holder.cursor += sizeInBytes1;
-
-          } else {
-            final int numElements = keys.numElements();
-            arrayWriter.initialize(holder, numElements, 8);
-
-            for (int index = 0; index < numElements; index++) {
-              if (keys.isNullAt(index)) {
-                arrayWriter.setNull(index);
-              } else {
-                final UTF8String element = keys.getUTF8String(index);
-                arrayWriter.write(index, element);
-              }
-            }
-          }
-
-          // Write the numBytes of key array into the first 8 bytes.
-          Platform.putLong(holder.buffer, tmpCursor1 - 8, holder.cursor - tmpCursor1);
-
-
-          if (values instanceof UnsafeArrayData) {
-
-            final int sizeInBytes2 = ((UnsafeArrayData) values).getSizeInBytes();
-            // grow the global buffer before writing data.
-            holder.grow(sizeInBytes2);
-            ((UnsafeArrayData) values).writeToMemory(holder.buffer, holder.cursor);
-            holder.cursor += sizeInBytes2;
-
-          } else {
-            final int numElements1 = values.numElements();
-            arrayWriter1.initialize(holder, numElements1, 8);
-
-            for (int index1 = 0; index1 < numElements1; index1++) {
-              if (values.isNullAt(index1)) {
-                arrayWriter1.setNull(index1);
-              } else {
-                final UTF8String element1 = values.getUTF8String(index1);
-                arrayWriter1.write(index1, element1);
-              }
-            }
-          }
-
-        }
-
-        rowWriter.setOffsetAndSize(0, tmpCursor, holder.cursor - tmpCursor);
-      }
-      result.setTotalSize(holder.totalSize());
-      return result;
-    }
-  }
-}
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java
index a93675f..3b0d32b 100644
--- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java
+++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestDataFrameWrites.java
@@ -63,7 +63,6 @@
   public static Object[][] parameters() {
     return new Object[][] {
         new Object[] { "parquet" },
-        new Object[] { "orc" },
         new Object[] { "avro" }
     };
   }
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java
index e459ea6..e0c3fa1 100644
--- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java
+++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java
@@ -20,6 +20,7 @@
 package com.netflix.iceberg.spark.source;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.netflix.iceberg.DataFile;
 import com.netflix.iceberg.DataFiles;
@@ -29,12 +30,11 @@
 import com.netflix.iceberg.Table;
 import com.netflix.iceberg.avro.Avro;
 import com.netflix.iceberg.avro.AvroSchemaUtil;
-import com.netflix.iceberg.expressions.Expressions;
+import com.netflix.iceberg.exceptions.RuntimeIOException;
 import com.netflix.iceberg.expressions.Literal;
 import com.netflix.iceberg.hadoop.HadoopTables;
 import com.netflix.iceberg.io.FileAppender;
 import com.netflix.iceberg.parquet.Parquet;
-import com.netflix.iceberg.spark.SparkExpressions;
 import com.netflix.iceberg.spark.data.TestHelpers;
 import com.netflix.iceberg.transforms.Transform;
 import com.netflix.iceberg.transforms.Transforms;
@@ -45,16 +45,19 @@
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.api.java.UDF1;
-import org.apache.spark.sql.catalyst.expressions.Expression;
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.sources.And;
+import org.apache.spark.sql.sources.EqualTo;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.sources.GreaterThan;
+import org.apache.spark.sql.sources.LessThan;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
-import org.apache.spark.sql.sources.v2.reader.SupportsPushDownCatalystFilters;
-import org.apache.spark.sql.sources.v2.reader.SupportsScanUnsafeRow;
-import org.apache.spark.sql.types.DateType$;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
 import org.apache.spark.sql.types.IntegerType$;
-import org.apache.spark.sql.types.StringType$;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -74,10 +77,7 @@
 import static com.netflix.iceberg.Files.localOutput;
 import static org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp;
 import static org.apache.spark.sql.functions.callUDF;
-import static org.apache.spark.sql.functions.col;
 import static org.apache.spark.sql.functions.column;
-import static org.apache.spark.sql.functions.lit;
-import static org.apache.spark.sql.functions.to_date;
 
 @RunWith(Parameterized.class)
 public class TestFilteredScan {
@@ -102,10 +102,6 @@
       .hour("ts")
       .build();
 
-  private static final PartitionSpec PARTITION_BY_FIRST_LETTER = PartitionSpec.builderFor(SCHEMA)
-      .truncate("data", 1)
-      .build();
-
   private static SparkSession spark = null;
 
   @BeforeClass
@@ -118,18 +114,13 @@ public static void startSpark() {
 
     Transform<Long, Integer> day = Transforms.day(Types.TimestampType.withZone());
     spark.udf().register("ts_day",
-        (UDF1<Timestamp, Integer>) timestamp -> day.apply(fromJavaTimestamp(timestamp)),
+        (UDF1<Timestamp, Integer>) timestamp -> day.apply((Long) fromJavaTimestamp(timestamp)),
         IntegerType$.MODULE$);
 
     Transform<Long, Integer> hour = Transforms.hour(Types.TimestampType.withZone());
     spark.udf().register("ts_hour",
-        (UDF1<Timestamp, Integer>) timestamp -> hour.apply(fromJavaTimestamp(timestamp)),
+        (UDF1<Timestamp, Integer>) timestamp -> hour.apply((Long) fromJavaTimestamp(timestamp)),
         IntegerType$.MODULE$);
-
-    Transform<CharSequence, CharSequence> trunc1 = Transforms.truncate(Types.StringType.get(), 1);
-    spark.udf().register("trunc1",
-        (UDF1<CharSequence, CharSequence>) str -> trunc1.apply(str.toString()),
-        StringType$.MODULE$);
   }
 
   @AfterClass
@@ -216,9 +207,9 @@ public void testUnpartitionedIDFilters() {
     for (int i = 0; i < 10; i += 1) {
       DataSourceReader reader = source.createReader(options);
 
-      pushFilters(reader, Expressions.equal("id", i));
+      pushFilters(reader, EqualTo.apply("id", i));
 
-      List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
+      List<InputPartition<InternalRow>> tasks = reader.planInputPartitions();
       Assert.assertEquals("Should only create one task for a small file", 1, tasks.size());
 
       // validate row filtering
@@ -237,9 +228,9 @@ public void testUnpartitionedTimestampFilter() {
 
     DataSourceReader reader = source.createReader(options);
 
-    pushFilters(reader, Expressions.lessThan("ts", "2017-12-22T00:00:00+00:00"));
+    pushFilters(reader, LessThan.apply("ts", "2017-12-22T00:00:00+00:00"));
 
-    List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
+    List<InputPartition<InternalRow>> tasks = reader.planInputPartitions();
     Assert.assertEquals("Should only create one task for a small file", 1, tasks.size());
 
     assertEqualsSafe(SCHEMA.asStruct(), expected(5,6,7,8,9),
@@ -257,14 +248,14 @@ public void testBucketPartitionedIDFilters() {
     IcebergSource source = new IcebergSource();
     DataSourceReader unfiltered = source.createReader(options);
     Assert.assertEquals("Unfiltered table should created 4 read tasks",
-        4, planTasks(unfiltered).size());
+        4, unfiltered.planInputPartitions().size());
 
     for (int i = 0; i < 10; i += 1) {
       DataSourceReader reader = source.createReader(options);
 
-      pushFilters(reader, Expressions.equal("id", i));
+      pushFilters(reader, EqualTo.apply("id", i));
 
-      List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
+      List<InputPartition<InternalRow>> tasks = reader.planInputPartitions();
 
       // validate predicate push-down
       Assert.assertEquals("Should create one task for a single bucket", 1, tasks.size());
@@ -282,18 +273,17 @@ public void testDayPartitionedTimestampFilters() {
         "path", location.toString())
     );
 
-    int day = Literal.of("2017-12-21").<Integer>to(Types.DateType.get()).value();
     IcebergSource source = new IcebergSource();
     DataSourceReader unfiltered = source.createReader(options);
     Assert.assertEquals("Unfiltered table should created 2 read tasks",
-        2, planTasks(unfiltered).size());
+        2, unfiltered.planInputPartitions().size());
 
     {
       DataSourceReader reader = source.createReader(options);
 
-      pushFilters(reader, Expressions.lessThan("ts", "2017-12-22T00:00:00+00:00"));
+      pushFilters(reader, LessThan.apply("ts", "2017-12-22T00:00:00+00:00"));
 
-      List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
+      List<InputPartition<InternalRow>> tasks = reader.planInputPartitions();
       Assert.assertEquals("Should create one task for 2017-12-21", 1, tasks.size());
 
       assertEqualsSafe(SCHEMA.asStruct(), expected(5, 6, 7, 8, 9),
@@ -303,35 +293,11 @@ public void testDayPartitionedTimestampFilters() {
     {
       DataSourceReader reader = source.createReader(options);
 
-      pushFilters(reader, col("ts").cast(DateType$.MODULE$).$eq$eq$eq(lit(day)).expr());
-
-      List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
-      Assert.assertEquals("Should create one task for 2017-12-21", 1, tasks.size());
-
-      assertEqualsSafe(SCHEMA.asStruct(), expected(5, 6, 7, 8, 9),
-          read(location.toString(), "cast(ts as date) = date '2017-12-21'"));
-    }
+      pushFilters(reader, And.apply(
+          GreaterThan.apply("ts", "2017-12-22T06:00:00+00:00"),
+          LessThan.apply("ts", "2017-12-22T08:00:00+00:00")));
 
-    {
-      DataSourceReader reader = source.createReader(options);
-
-      pushFilters(reader, to_date(col("ts")).$eq$eq$eq(lit(day)).expr());
-
-      List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
-      Assert.assertEquals("Should create one task for 2017-12-21", 1, tasks.size());
-
-      assertEqualsSafe(SCHEMA.asStruct(), expected(5, 6, 7, 8, 9),
-          read(location.toString(), "to_date(ts) = date '2017-12-21'"));
-    }
-
-    {
-      DataSourceReader reader = source.createReader(options);
-
-      pushFilters(reader, Expressions.and(
-          Expressions.greaterThan("ts", "2017-12-22T06:00:00+00:00"),
-          Expressions.lessThan("ts", "2017-12-22T08:00:00+00:00")));
-
-      List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
+      List<InputPartition<InternalRow>> tasks = reader.planInputPartitions();
       Assert.assertEquals("Should create one task for 2017-12-22", 1, tasks.size());
 
       assertEqualsSafe(SCHEMA.asStruct(), expected(1, 2), read(location.toString(),
@@ -351,14 +317,14 @@ public void testHourPartitionedTimestampFilters() {
     IcebergSource source = new IcebergSource();
     DataSourceReader unfiltered = source.createReader(options);
     Assert.assertEquals("Unfiltered table should created 9 read tasks",
-        9, planTasks(unfiltered).size());
+        9, unfiltered.planInputPartitions().size());
 
     {
       DataSourceReader reader = source.createReader(options);
 
-      pushFilters(reader, Expressions.lessThan("ts", "2017-12-22T00:00:00+00:00"));
+      pushFilters(reader, LessThan.apply("ts", "2017-12-22T00:00:00+00:00"));
 
-      List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
+      List<InputPartition<InternalRow>> tasks = reader.planInputPartitions();
       Assert.assertEquals("Should create 4 tasks for 2017-12-21: 15, 17, 21, 22", 4, tasks.size());
 
       assertEqualsSafe(SCHEMA.asStruct(), expected(8, 9, 7, 6, 5),
@@ -368,11 +334,11 @@ public void testHourPartitionedTimestampFilters() {
     {
       DataSourceReader reader = source.createReader(options);
 
-      pushFilters(reader, Expressions.and(
-          Expressions.greaterThan("ts", "2017-12-22T06:00:00+00:00"),
-          Expressions.lessThan("ts", "2017-12-22T08:00:00+00:00")));
+      pushFilters(reader, And.apply(
+          GreaterThan.apply("ts", "2017-12-22T06:00:00+00:00"),
+          LessThan.apply("ts", "2017-12-22T08:00:00+00:00")));
 
-      List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
+      List<InputPartition<InternalRow>> tasks = reader.planInputPartitions();
       Assert.assertEquals("Should create 2 tasks for 2017-12-22: 6, 7", 2, tasks.size());
 
       assertEqualsSafe(SCHEMA.asStruct(), expected(2, 1), read(location.toString(),
@@ -381,41 +347,6 @@ public void testHourPartitionedTimestampFilters() {
     }
   }
 
-  @Test
-  public void testTrunctateDataPartitionedFilters() {
-    File location = buildPartitionedTable("trunc", PARTITION_BY_FIRST_LETTER, "trunc1", "data");
-
-    DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
-        "path", location.toString())
-    );
-
-    IcebergSource source = new IcebergSource();
-    DataSourceReader unfiltered = source.createReader(options);
-    Assert.assertEquals("Unfiltered table should have created 9 read tasks",
-        9, planTasks(unfiltered).size());
-
-    {
-      DataSourceReader reader = source.createReader(options);
-
-      pushFilters(reader, Expressions.equal("data", "goldfish"));
-
-      List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
-      Assert.assertEquals("Should create 1 task for 'goldfish' (g)", 1, tasks.size());
-    }
-
-    {
-      DataSourceReader reader = source.createReader(options);
-
-      pushFilters(reader, col("data").$eq$eq$eq("goldfish").expr());
-
-      List<DataReaderFactory<UnsafeRow>> tasks = planTasks(reader);
-      Assert.assertEquals("Should create 1 task for 'goldfish' (g)", 1, tasks.size());
-    }
-
-    assertEqualsSafe(SCHEMA.asStruct(), expected(9),
-        read(location.toString(), "data = 'goldfish'"));
-  }
-
   @Test
   public void testFilterByNonProjectedColumn() {
     {
@@ -426,9 +357,9 @@ public void testFilterByNonProjectedColumn() {
       }
 
       assertEqualsSafe(actualProjection.asStruct(), expected, read(
-              unpartitioned.toString(),
-              "cast('2017-12-22 00:00:00+00:00' as timestamp) > ts",
-              "id", "data"));
+          unpartitioned.toString(),
+          "ts < cast('2017-12-22 00:00:00+00:00' as timestamp)",
+          "id", "data"));
     }
 
     {
@@ -443,7 +374,7 @@ public void testFilterByNonProjectedColumn() {
       assertEqualsSafe(actualProjection.asStruct(), expected, read(
           unpartitioned.toString(),
           "ts > cast('2017-12-22 06:00:00+00:00' as timestamp) and " +
-              "cast('2017-12-22 08:00:00+00:00' as timestamp) > ts",
+              "ts < cast('2017-12-22 08:00:00+00:00' as timestamp)",
           "id"));
     }
   }
@@ -459,6 +390,16 @@ private static Record projectFlat(Schema projection, Record record) {
     return result;
   }
 
+  public static void assertEqualsUnsafe(Types.StructType struct,
+                                        List<Record> expected, List<UnsafeRow> actual) {
+    // TODO: match records by ID
+    int numRecords = Math.min(expected.size(), actual.size());
+    for (int i = 0; i < numRecords; i += 1) {
+      TestHelpers.assertEqualsUnsafe(struct, expected.get(i), actual.get(i));
+    }
+    Assert.assertEquals("Number of results should match expected", expected.size(), actual.size());
+  }
+
   public static void assertEqualsSafe(Types.StructType struct,
                                       List<Record> expected, List<Row> actual) {
     // TODO: match records by ID
@@ -477,26 +418,10 @@ public static void assertEqualsSafe(Types.StructType struct,
     return expected;
   }
 
-  private void pushFilters(DataSourceReader reader,
-                           com.netflix.iceberg.expressions.Expression... filters) {
-    Expression[] expressions = new Expression[filters.length];
-    for (int i = 0; i < filters.length; i += 1) {
-      expressions[i] = SparkExpressions.convert(filters[i], SCHEMA);
-    }
-    pushFilters(reader, expressions);
-  }
-
-  private void pushFilters(DataSourceReader reader,
-                           Expression... expressions) {
-    Assert.assertTrue(reader instanceof SupportsPushDownCatalystFilters);
-    SupportsPushDownCatalystFilters filterable = (SupportsPushDownCatalystFilters) reader;
-    filterable.pushCatalystFilters(expressions);
-  }
-
-  private List<DataReaderFactory<UnsafeRow>> planTasks(DataSourceReader reader) {
-    Assert.assertTrue(reader instanceof SupportsScanUnsafeRow);
-    SupportsScanUnsafeRow unsafeReader = (SupportsScanUnsafeRow) reader;
-    return unsafeReader.createUnsafeRowReaderFactories();
+  private void pushFilters(DataSourceReader reader, Filter... filters) {
+    Assert.assertTrue(reader instanceof SupportsPushDownFilters);
+    SupportsPushDownFilters filterable = (SupportsPushDownFilters) reader;
+    filterable.pushFilters(filters);
   }
 
   private File buildPartitionedTable(String desc, PartitionSpec spec, String udf, String partitionColumn) {
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestOrcScan.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestOrcScan.java
deleted file mode 100644
index 4a6fb26..0000000
--- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestOrcScan.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Copyright 2018 Hortonworks
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.netflix.iceberg.spark.source;
-
-import com.netflix.iceberg.DataFile;
-import com.netflix.iceberg.DataFiles;
-import com.netflix.iceberg.FileFormat;
-import com.netflix.iceberg.Metrics;
-import com.netflix.iceberg.PartitionSpec;
-import com.netflix.iceberg.Schema;
-import com.netflix.iceberg.Table;
-import com.netflix.iceberg.hadoop.HadoopTables;
-import com.netflix.iceberg.io.FileAppender;
-import com.netflix.iceberg.orc.ORC;
-import com.netflix.iceberg.orc.OrcFileAppender;
-import com.netflix.iceberg.spark.data.AvroDataTest;
-import com.netflix.iceberg.spark.data.RandomData;
-import com.netflix.iceberg.spark.data.SparkOrcWriter;
-import com.netflix.iceberg.spark.data.TestHelpers;
-import com.netflix.iceberg.types.Type;
-import com.netflix.iceberg.types.Types;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.storage.serde2.io.DateWritable;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
-import org.apache.spark.sql.catalyst.util.ArrayData;
-import org.apache.spark.sql.catalyst.util.DateTimeUtils;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
-
-import static com.netflix.iceberg.Files.localOutput;
-
-public class TestOrcScan extends AvroDataTest {
-  private static final Configuration CONF = new Configuration();
-
-  @Rule
-  public TemporaryFolder temp = new TemporaryFolder();
-
-  private static SparkSession spark = null;
-
-  @BeforeClass
-  public static void startSpark() {
-    TestOrcScan.spark = SparkSession.builder().master("local[2]").getOrCreate();
-  }
-
-  @AfterClass
-  public static void stopSpark() {
-    SparkSession spark = TestOrcScan.spark;
-    TestOrcScan.spark = null;
-    spark.stop();
-  }
-
-  @Override
-  protected void writeAndValidate(Schema schema) throws IOException {
-    System.out.println("Starting ORC test with " + schema);
-    final int ROW_COUNT = 100;
-    final long SEED = 1;
-    File parent = temp.newFolder("orc");
-    File location = new File(parent, "test");
-    File dataFolder = new File(location, "data");
-    dataFolder.mkdirs();
-
-    File orcFile = new File(dataFolder,
-        FileFormat.ORC.addExtension(UUID.randomUUID().toString()));
-
-    HadoopTables tables = new HadoopTables(CONF);
-    Table table = tables.create(schema, PartitionSpec.unpartitioned(),
-        location.toString());
-
-    // Important: use the table's schema for the rest of the test
-    // When tables are created, the column ids are reassigned.
-    Schema tableSchema = table.schema();
-
-    Metrics metrics;
-    SparkOrcWriter writer = new SparkOrcWriter(ORC.write(localOutput(orcFile))
-        .schema(tableSchema)
-        .build());
-    try {
-      writer.addAll(RandomData.generateSpark(tableSchema, ROW_COUNT, SEED));
-    } finally {
-      writer.close();
-      // close writes the last batch, so metrics are not correct until after close is called
-      metrics = writer.metrics();
-    }
-
-    DataFile file = DataFiles.builder(PartitionSpec.unpartitioned())
-        .withFileSizeInBytes(orcFile.length())
-        .withPath(orcFile.toString())
-        .withMetrics(metrics)
-        .build();
-
-    table.newAppend().appendFile(file).commit();
-
-    Dataset<Row> df = spark.read()
-        .format("iceberg")
-        .load(location.toString());
-
-    List<Row> rows = df.collectAsList();
-    Assert.assertEquals("Wrong number of rows", ROW_COUNT, rows.size());
-    Iterator<InternalRow> expected = RandomData.generateSpark(tableSchema,
-        ROW_COUNT, SEED);
-    for(int i=0; i < ROW_COUNT; ++i) {
-      TestHelpers.assertEquals("row " + i, schema.asStruct(), expected.next(),
-          rows.get(i));
-    }
-  }
-}
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestOrcWrite.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestOrcWrite.java
deleted file mode 100644
index bc12670..0000000
--- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestOrcWrite.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Copyright 2018 Hortonworks
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.netflix.iceberg.spark.source;
-
-import com.google.common.collect.Lists;
-import com.netflix.iceberg.FileFormat;
-import com.netflix.iceberg.PartitionSpec;
-import com.netflix.iceberg.Schema;
-import com.netflix.iceberg.Table;
-import com.netflix.iceberg.hadoop.HadoopTables;
-import com.netflix.iceberg.types.Types;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.orc.CompressionKind;
-import org.apache.orc.OrcConf;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-
-import static com.netflix.iceberg.types.Types.NestedField.optional;
-
-public class TestOrcWrite {
-  private static final Configuration CONF = new Configuration();
-  private static final Schema SCHEMA = new Schema(
-      optional(1, "id", Types.IntegerType.get()),
-      optional(2, "data", Types.StringType.get())
-  );
-
-  @Rule
-  public TemporaryFolder temp = new TemporaryFolder();
-
-  private static SparkSession spark = null;
-
-  @BeforeClass
-  public static void startSpark() {
-    TestOrcWrite.spark = SparkSession.builder().master("local[2]").getOrCreate();
-  }
-
-  @AfterClass
-  public static void stopSpark() {
-    SparkSession spark = TestOrcWrite.spark;
-    TestOrcWrite.spark = null;
-    spark.stop();
-  }
-
-  @Test
-  public void testBasicWrite() throws IOException {
-    File parent = temp.newFolder("orc");
-    File location = new File(parent, "test");
-    location.mkdirs();
-
-    HadoopTables tables = new HadoopTables(CONF);
-    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
-    Table table = tables.create(SCHEMA, spec, location.toString());
-    table.updateProperties()
-        .defaultFormat(FileFormat.ORC)
-        .set(OrcConf.COMPRESS.getAttribute(), CompressionKind.NONE.name())
-        .commit();
-
-    List<SimpleRecord> expected = Lists.newArrayList(
-        new SimpleRecord(1, "a"),
-        new SimpleRecord(2, "b"),
-        new SimpleRecord(3, "c")
-    );
-
-    Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
-
-    // TODO: incoming columns must be ordered according to the table's schema
-    df.select("id", "data").write()
-        .format("iceberg")
-        .mode("append")
-        .save(location.toString());
-
-    table.refresh();
-
-    Dataset<Row> result = spark.read()
-        .format("iceberg")
-        .load(location.toString());
-
-    List<SimpleRecord> actual = result.orderBy("id").as(
-        Encoders.bean(SimpleRecord.class)).collectAsList();
-
-    Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
-    Assert.assertEquals("Result rows should match", expected, actual);
-  }
-}
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestSparkReadProjection.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestSparkReadProjection.java
index be6876b..d35bba3 100644
--- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestSparkReadProjection.java
+++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestSparkReadProjection.java
@@ -88,7 +88,7 @@ public static void stopSpark() {
 
   @Override
   protected Record writeAndRead(String desc, Schema writeSchema, Schema readSchema,
-                                            Record record) throws IOException {
+                                Record record) throws IOException {
     File parent = temp.newFolder(desc);
     File location = new File(parent, "test");
     File dataFolder = new File(location, "data");
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java
index fc9253e..a7ff513 100644
--- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java
+++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestTables.java
@@ -51,9 +51,18 @@ static TestTable create(File temp, String name, Schema schema, PartitionSpec spe
 
   static TestTable load(String name) {
     TestTableOperations ops = new TestTableOperations(name);
+    if (ops.current() == null) {
+      return null;
+    }
     return new TestTable(ops, name);
   }
 
+  static boolean drop(String name) {
+    synchronized (METADATA) {
+      return METADATA.remove(name) != null;
+    }
+  }
+
   static class TestTable extends BaseTable {
     private final TestTableOperations ops;
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services